Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Python Backend

Python backend patterns for asyncio, FastAPI, SQLAlchemy 2.0 async, and connection pooling. Use when building async Python services, FastAPI endpoints, database sessions, or connection pool tuning.

Reference medium

Primary Agent: backend-system-architect

Python Backend

Patterns for building production Python backends with asyncio, FastAPI, SQLAlchemy 2.0, and connection pooling. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Asyncio3HIGHTaskGroup, structured concurrency, cancellation handling
FastAPI3HIGHDependencies, middleware, background tasks
SQLAlchemy3HIGHAsync sessions, relationships, migrations
Pooling3MEDIUMDatabase pools, HTTP sessions, tuning

Total: 12 rules across 4 categories

Quick Start

# FastAPI + SQLAlchemy async session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()
# Asyncio TaskGroup with timeout
async def fetch_all(urls: list[str]) -> list[dict]:
    async with asyncio.timeout(30):
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch_url(url)) for url in urls]
    return [t.result() for t in tasks]

Asyncio

Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.

Key Patterns

  • TaskGroup replaces gather() with structured concurrency and auto-cancellation
  • asyncio.timeout() context manager for composable timeouts
  • Semaphore for concurrency limiting (rate-limit HTTP requests)
  • except* with ExceptionGroup for handling multiple task failures
  • asyncio.to_thread() for bridging sync code to async

Key Decisions

DecisionRecommendation
Task spawningTaskGroup not gather()
Timeoutsasyncio.timeout() context manager
Concurrency limitasyncio.Semaphore
Sync bridgeasyncio.to_thread()
CancellationAlways re-raise CancelledError

FastAPI

Production-ready FastAPI patterns for lifespan, dependencies, middleware, and settings.

Key Patterns

  • Lifespan with asynccontextmanager for startup/shutdown resource management
  • Dependency injection with class-based services and Depends()
  • Middleware stack: CORS -> RequestID -> Timing -> Logging
  • Pydantic Settings with .env and field validation
  • Exception handlers with RFC 7807 Problem Details

Key Decisions

DecisionRecommendation
Lifespanasynccontextmanager (not events)
DependenciesClass-based services with DI
SettingsPydantic Settings with .env
ResponseORJSONResponse for performance
HealthCheck all critical dependencies

SQLAlchemy

Async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.

Key Patterns

  • One AsyncSession per request with expire_on_commit=False
  • lazy="raise" on relationships to prevent accidental N+1 queries
  • selectinload for eager loading collections
  • Repository pattern with generic async CRUD
  • Bulk inserts chunked 1000-10000 rows for memory management

Key Decisions

DecisionRecommendation
Session scopeOne AsyncSession per request
Lazy loadinglazy="raise" + explicit loads
Eager loadingselectinload for collections
expire_on_commitFalse (prevents lazy load errors)
Poolpool_pre_ping=True

Pooling

Database and HTTP connection pooling for high-performance async Python applications.

Key Patterns

  • SQLAlchemy pool with pool_size, max_overflow, pool_pre_ping
  • Direct asyncpg pool with min_size/max_size and connection lifecycle
  • aiohttp session with TCPConnector limits and DNS caching
  • FastAPI lifespan creating and closing pools at startup/shutdown
  • Pool monitoring with Prometheus metrics

Pool Sizing Formula

pool_size = (concurrent_requests / avg_queries_per_request) * 1.5

Anti-Patterns (FORBIDDEN)

# NEVER use gather() for new code - no structured concurrency
# NEVER swallow CancelledError - breaks TaskGroup and timeout
# NEVER block the event loop with sync calls (time.sleep, requests.get)
# NEVER use global mutable state for db sessions
# NEVER skip dependency injection (create sessions in routes)
# NEVER share AsyncSession across tasks (race condition)
# NEVER use sync Session in async code (blocks event loop)
# NEVER create engine/pool per request
# NEVER forget to close pools on shutdown
  • ork:architecture-patterns - Clean architecture and layer separation
  • ork:async-jobs - Celery/ARQ for background processing
  • streaming-api-patterns - SSE/WebSocket async patterns
  • ork:database-patterns - Database schema design

Rules (12)

Handle asyncio cancellation correctly for proper TaskGroup and timeout behavior — HIGH

Cancellation Handling

Proper Cancellation Pattern

async def cancellable_operation(resource_id: str) -> dict:
    """Properly handle cancellation - NEVER swallow CancelledError."""
    resource = await acquire_resource(resource_id)
    try:
        return await process_resource(resource)
    except asyncio.CancelledError:
        # Clean up but RE-RAISE - this is critical!
        await cleanup_resource(resource)
        raise  # ALWAYS re-raise CancelledError
    finally:
        await release_resource(resource)

Anti-Patterns

# NEVER swallow CancelledError - breaks structured concurrency
except asyncio.CancelledError:
    return None  # BREAKS TaskGroup and timeout!

# NEVER use create_task() without TaskGroup - tasks leak
asyncio.create_task(background_work())  # Fire and forget = leaked task

# NEVER yield inside async context managers (PEP 789)
async with asyncio.timeout(10):
    yield item  # DANGEROUS - cancellation bugs!

Key Principles

  • Always re-raise CancelledError after cleanup
  • Breaking this rule breaks TaskGroup and timeout behavior
  • Use try/finally for guaranteed resource cleanup
  • Never use bare create_task() outside a TaskGroup

Incorrect — Swallowing CancelledError breaks TaskGroup cancellation propagation:

async def broken_task():
    try:
        await long_running_operation()
    except asyncio.CancelledError:
        return None  # BUG: TaskGroup cannot cancel properly!

Correct — Re-raising CancelledError allows proper cleanup and cancellation flow:

async def proper_task():
    try:
        await long_running_operation()
    except asyncio.CancelledError:
        await cleanup()
        raise  # CRITICAL: Propagate cancellation

Use semaphores and sync bridges to prevent resource exhaustion and event loop blocking — HIGH

Structured Concurrency Patterns

Semaphore for Concurrency Limiting

class RateLimitedClient:
    """HTTP client with concurrency limiting."""

    def __init__(self, max_concurrent: int = 10):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._session: aiohttp.ClientSession | None = None

    async def fetch(self, url: str) -> dict:
        async with self._semaphore:  # Limit concurrent requests
            async with self._session.get(url) as response:
                return await response.json()

    async def fetch_many(self, urls: list[str]) -> list[dict]:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(self.fetch(url)) for url in urls]
        return [t.result() for t in tasks]

Sync-to-Async Bridge

import asyncio
from concurrent.futures import ThreadPoolExecutor

# For CPU-bound or blocking sync code
async def run_blocking_operation(data: bytes) -> dict:
    """Run blocking sync code in thread pool."""
    return await asyncio.to_thread(cpu_intensive_parse, data)

# For sync code that needs async context
def sync_caller():
    """Call async code from sync context (not in existing loop)."""
    return asyncio.run(async_main())

# For sync code within existing async context
async def wrapper_for_sync_lib():
    """Bridge sync library to async - use with care."""
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, sync_blocking_call)
    return result

Key Principles

  • Use asyncio.Semaphore to prevent connection pool exhaustion
  • Use asyncio.to_thread() for clean sync-to-async bridging
  • Never call asyncio.run() inside an existing event loop
  • Never block the event loop with time.sleep() or requests.get()

Incorrect — Unlimited concurrent requests exhaust connection pools and memory:

async def fetch_all(urls: list[str]):
    tasks = [fetch_url(url) for url in urls]  # 10,000 concurrent!
    return await asyncio.gather(*tasks)

Correct — Semaphore limits concurrency to prevent resource exhaustion:

async def fetch_all(urls: list[str], max_concurrent: int = 10):
    sem = asyncio.Semaphore(max_concurrent)
    async def limited_fetch(url):
        async with sem:
            return await fetch_url(url)
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(limited_fetch(url)) for url in urls]
    return [t.result() for t in tasks]

Apply TaskGroup for structured concurrency with automatic cancellation on failure — HIGH

TaskGroup & Timeout Patterns

TaskGroup (Replaces gather)

import asyncio

async def fetch_user_data(user_id: str) -> dict:
    """Fetch user data concurrently - all tasks complete or all cancelled."""
    async with asyncio.TaskGroup() as tg:
        user_task = tg.create_task(fetch_user(user_id))
        orders_task = tg.create_task(fetch_orders(user_id))
        preferences_task = tg.create_task(fetch_preferences(user_id))

    # All tasks guaranteed complete here
    return {
        "user": user_task.result(),
        "orders": orders_task.result(),
        "preferences": preferences_task.result(),
    }

TaskGroup with Timeout

async def fetch_with_timeout(urls: list[str], timeout_sec: float = 30) -> list[dict]:
    """Fetch all URLs with overall timeout - structured concurrency."""
    results = []

    async with asyncio.timeout(timeout_sec):
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch_url(url)) for url in urls]

    return [t.result() for t in tasks]

Exception Group Handling

async def process_batch(items: list[dict]) -> tuple[list[dict], list[Exception]]:
    """Process batch, collecting both successes and failures."""
    results = []
    errors = []

    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(process_item(item)) for item in items]
    except* ValueError as eg:
        errors.extend(eg.exceptions)
    except* Exception as eg:
        errors.extend(eg.exceptions)
    else:
        results = [t.result() for t in tasks]

    return results, errors

Key Principles

  • Use TaskGroup not gather() for all new code
  • Use asyncio.timeout() context manager for deadlines
  • Handle ExceptionGroup with except* for multiple failures
  • TaskGroup auto-cancels remaining tasks when one fails

Incorrect — gather() doesn't cancel remaining tasks when one fails:

results = await asyncio.gather(
    fetch_user(user_id),
    fetch_orders(user_id),
    fetch_preferences(user_id),
)  # If one fails, others keep running (resource leak)

Correct — TaskGroup auto-cancels all tasks when any fails (structured concurrency):

async with asyncio.TaskGroup() as tg:
    user_task = tg.create_task(fetch_user(user_id))
    orders_task = tg.create_task(fetch_orders(user_id))
    prefs_task = tg.create_task(fetch_preferences(user_id))
# If any fails, all are cancelled automatically

Configure FastAPI lifespan management to prevent resource leaks during startup and shutdown — HIGH

FastAPI Lifespan & Health

Lifespan Context Manager

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan with resource management."""
    # Startup
    app.state.db_engine = create_async_engine(
        settings.database_url, pool_size=5, max_overflow=10,
    )
    app.state.redis = redis.from_url(settings.redis_url)

    # Health check connections
    async with app.state.db_engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    await app.state.redis.ping()

    yield  # Application runs

    # Shutdown
    await app.state.db_engine.dispose()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

Health Check Endpoint

@health_router.get("/health")
async def health_check(request: Request):
    checks = {}
    try:
        async with request.app.state.db_engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception as e:
        checks["database"] = f"unhealthy: {e}"

    try:
        await request.app.state.redis.ping()
        checks["redis"] = "healthy"
    except Exception as e:
        checks["redis"] = f"unhealthy: {e}"

    status = "healthy" if all(v == "healthy" for v in checks.values()) else "unhealthy"
    return {"status": status, "checks": checks}

Pydantic Settings

from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)

    database_url: PostgresDsn
    db_pool_size: int = Field(default=5, ge=1, le=20)
    redis_url: str = "redis://localhost:6379"
    api_key: str = Field(min_length=32)
    debug: bool = False

    @field_validator("database_url", mode="before")
    @classmethod
    def validate_database_url(cls, v: str) -> str:
        if v and "+asyncpg" not in v:
            return v.replace("postgresql://", "postgresql+asyncpg://")
        return v

Exception Handlers

@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.to_problem_detail(),
        media_type="application/problem+json",
    )

Response Optimization

from fastapi.responses import ORJSONResponse

app = FastAPI(default_response_class=ORJSONResponse)

Incorrect — Creating resources at module level leads to connection leaks:

# Module-level connection (never closed!)
db_engine = create_async_engine(DATABASE_URL)

app = FastAPI()

@app.on_event("startup")  # Deprecated
async def startup():
    await db_engine.connect()

Correct — Lifespan context manager ensures proper resource cleanup:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create resources
    app.state.db_engine = create_async_engine(DATABASE_URL)
    yield
    # Shutdown: guaranteed cleanup
    await app.state.db_engine.dispose()

app = FastAPI(lifespan=lifespan)

Design FastAPI dependency injection for testable and maintainable application architecture — HIGH

FastAPI Dependency Injection

Database Session Dependency

from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request

async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    """Yield database session from app state."""
    async with AsyncSession(
        request.app.state.db_engine,
        expire_on_commit=False,
    ) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Service Dependencies

class AnalysisService:
    def __init__(self, db: AsyncSession, embeddings: EmbeddingsService, llm: LLMService):
        self.db = db
        self.embeddings = embeddings
        self.llm = llm

def get_analysis_service(
    db: AsyncSession = Depends(get_db),
    request: Request = None,
) -> AnalysisService:
    return AnalysisService(
        db=db,
        embeddings=request.app.state.embeddings,
        llm=request.app.state.llm,
    )

@router.post("/analyses")
async def create_analysis(
    data: AnalysisCreate,
    service: AnalysisService = Depends(get_analysis_service),
):
    return await service.create(data)

Cached Settings

from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    api_key: str
    model_config = {"env_file": ".env"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

Authentication Chain

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_jwt(token)
    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(401, "Invalid credentials")
    return user

async def get_admin_user(user: User = Depends(get_current_user)) -> User:
    if not user.is_admin:
        raise HTTPException(403, "Admin access required")
    return user

Incorrect — Manually creating dependencies couples code and breaks testability:

@router.post("/analyses")
async def create_analysis(data: AnalysisCreate, request: Request):
    db = AsyncSession(request.app.state.db_engine)
    service = AnalysisService(db)  # Cannot mock in tests
    return await service.create(data)

Correct — Depends() enables dependency injection and easy testing:

@router.post("/analyses")
async def create_analysis(
    data: AnalysisCreate,
    service: AnalysisService = Depends(get_analysis_service),
):
    return await service.create(data)
# Tests can override get_analysis_service with mocks

Order FastAPI middleware correctly since it affects every request in the application — HIGH

FastAPI Middleware Patterns

Request ID Middleware

import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

Timing Middleware

import time
from starlette.middleware.base import BaseHTTPMiddleware

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start
        response.headers["X-Response-Time"] = f"{duration:.3f}s"
        return response

Structured Logging Middleware

import structlog
from starlette.middleware.base import BaseHTTPMiddleware

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        log = logger.bind(
            request_id=getattr(request.state, "request_id", None),
            method=request.method,
            path=request.url.path,
        )
        try:
            response = await call_next(request)
            log.info("request_completed", status_code=response.status_code)
            return response
        except Exception as exc:
            log.exception("request_failed", error=str(exc))
            raise

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    expose_headers=["X-Request-ID", "X-Response-Time"],
)

Middleware Order

Add middleware in this order (last added runs first):

  1. CORS (outermost)
  2. RequestID
  3. Timing
  4. Logging (innermost)

Incorrect — Wrong middleware order causes missing request IDs in logs:

app.add_middleware(LoggingMiddleware)  # Runs first, no request_id yet
app.add_middleware(RequestIDMiddleware)  # Runs second, sets request_id
# Result: Logs missing request_id

Correct — Correct order ensures request_id available for logging:

app.add_middleware(LoggingMiddleware)  # Runs last, has request_id
app.add_middleware(RequestIDMiddleware)  # Runs first, sets request_id
# Last added = outermost = runs first

Configure database connection pools correctly to prevent connection exhaustion under load — MEDIUM

Database Connection Pooling

SQLAlchemy Async Pool Configuration

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,           # Steady-state connections
    max_overflow=10,        # Burst capacity (total max = 30)
    pool_pre_ping=True,     # Validate before use
    pool_recycle=3600,      # Recreate connections after 1 hour
    pool_timeout=30,        # Wait for connection from pool
    connect_args={
        "command_timeout": 60,
        "server_settings": {"statement_timeout": "60000"},
    },
)

Direct asyncpg Pool

import asyncpg

pool = await asyncpg.create_pool(
    "postgresql://user:pass@localhost/db",
    min_size=10,
    max_size=20,
    max_inactive_connection_lifetime=300,
    command_timeout=60,
    timeout=30,
    setup=setup_connection,
)

async def setup_connection(conn):
    await conn.execute("SET timezone TO 'UTC'")
    await conn.execute("SET statement_timeout TO '60s'")

Pool Sizing

ParameterSmall ServiceMedium ServiceHigh Load
pool_size5-1020-5050-100
max_overflow510-2020-50
pool_pre_pingTrueTrueConsider False*
pool_recycle36001800900
pool_size = (concurrent_requests / avg_queries_per_request) * 1.5
Example: 100 concurrent / 3 queries = 50

Incorrect — Creating engine per request exhausts database connections:

@app.get("/users")
async def get_users():
    engine = create_async_engine(DATABASE_URL)  # New pool every request!
    async with AsyncSession(engine) as session:
        return await session.execute(select(User))

Correct — Reuse single engine from lifespan for all requests:

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.engine = create_async_engine(DATABASE_URL, pool_size=20)
    yield
    await app.state.engine.dispose()

@app.get("/users")
async def get_users(request: Request):
    async with AsyncSession(request.app.state.engine) as session:
        return await session.execute(select(User))

Reuse HTTP sessions with connection pooling to prevent churn and improve throughput — MEDIUM

HTTP Connection Pooling

aiohttp Session Pool

import aiohttp
from aiohttp import TCPConnector

connector = TCPConnector(
    limit=100,              # Total connections
    limit_per_host=20,      # Per-host limit
    keepalive_timeout=30,   # Keep-alive duration
    ssl=False,              # Or ssl.SSLContext for HTTPS
    ttl_dns_cache=300,      # DNS cache TTL
)

session = aiohttp.ClientSession(
    connector=connector,
    timeout=aiohttp.ClientTimeout(
        total=30,           # Total request timeout
        connect=10,         # Connection timeout
        sock_read=20,       # Read timeout
    ),
)

# IMPORTANT: Reuse session across requests
# Create once at startup, close at shutdown

FastAPI Lifespan Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
    app.state.http_session = aiohttp.ClientSession(
        connector=TCPConnector(limit=100)
    )
    yield
    await app.state.db_pool.close()
    await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)

Key Principles

  • Never create ClientSession per request (connection churn)
  • Create session at startup, close at shutdown
  • Set limit_per_host to prevent overwhelming a single service
  • Configure DNS caching for high-throughput scenarios

Incorrect — Creating session per request causes connection churn and poor performance:

@app.get("/fetch")
async def fetch_data():
    async with aiohttp.ClientSession() as session:  # New session every request!
        async with session.get("https://api.example.com") as resp:
            return await resp.json()

Correct — Reuse session from lifespan for connection pooling and keep-alive:

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.http_session = aiohttp.ClientSession(
        connector=TCPConnector(limit=100, limit_per_host=20)
    )
    yield
    await app.state.http_session.close()

@app.get("/fetch")
async def fetch_data(request: Request):
    async with request.app.state.http_session.get("https://api.example.com") as resp:
        return await resp.json()

Monitor connection pools to prevent silent exhaustion and stale connection errors — MEDIUM

Pool Monitoring & Tuning

Pool Monitoring with Prometheus

from prometheus_client import Gauge

pool_size = Gauge("db_pool_size", "Current pool size")
pool_available = Gauge("db_pool_available", "Available connections")

async def collect_pool_metrics(pool: asyncpg.Pool):
    pool_size.set(pool.get_size())
    pool_available.set(pool.get_idle_size())

Connection Exhaustion Diagnosis

# Symptom: "QueuePool limit reached" or timeouts
from sqlalchemy import event

@event.listens_for(engine.sync_engine, "checkout")
def log_checkout(dbapi_conn, conn_record, conn_proxy):
    print(f"Connection checked out: {id(dbapi_conn)}")

@event.listens_for(engine.sync_engine, "checkin")
def log_checkin(dbapi_conn, conn_record):
    print(f"Connection returned: {id(dbapi_conn)}")

# Fix: Ensure connections are returned
async with session.begin():
    pass  # Connection returned here

Stale Connection Handling

# Fix 1: Enable pool_pre_ping
engine = create_async_engine(url, pool_pre_ping=True)

# Fix 2: Reduce pool_recycle
engine = create_async_engine(url, pool_recycle=900)

# Fix 3: Application-level retry
from sqlalchemy.exc import DBAPIError

async def with_retry(session, operation, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await operation(session)
        except DBAPIError as e:
            if attempt == max_retries - 1:
                raise
            await session.rollback()

Anti-Patterns

# NEVER create engine/pool per request
async def get_data():
    engine = create_async_engine(url)  # WRONG - pool per request!

# NEVER create ClientSession per request
async def fetch():
    async with aiohttp.ClientSession() as session:  # WRONG!
        return await session.get(url)

# NEVER forget to close pools on shutdown
# NEVER set pool_size too high (exhausts DB connections)

Incorrect — No pool monitoring leads to silent connection exhaustion:

# No visibility into pool state
engine = create_async_engine(url, pool_size=20)
# App runs slow, no idea why (pool exhausted)

Correct — Prometheus metrics reveal pool exhaustion before timeouts occur:

from prometheus_client import Gauge

pool_size_gauge = Gauge("db_pool_size", "DB pool size")
pool_available_gauge = Gauge("db_pool_available", "Available connections")

async def collect_metrics(pool):
    pool_size_gauge.set(pool.get_size())
    pool_available_gauge.set(pool.get_idle_size())
# Alert when pool_available approaches 0

Implement repository pattern and bulk operations for maintainable SQLAlchemy data access — HIGH

Repository & Bulk Operations

Generic Repository Pattern

from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T", bound=Base)

class AsyncRepository(Generic[T]):
    """Generic async repository for CRUD operations."""

    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model

    async def get(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)

    async def get_many(self, ids: list[UUID]) -> list[T]:
        result = await self.session.execute(
            select(self.model).where(self.model.id.in_(ids))
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.flush()
        return instance

    async def update(self, instance: T, **kwargs) -> T:
        for key, value in kwargs.items():
            setattr(instance, key, value)
        await self.session.flush()
        return instance

    async def delete(self, instance: T) -> None:
        await self.session.delete(instance)
        await self.session.flush()

Bulk Operations

async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
    """Efficient bulk insert."""
    users = [User(**data) for data in users_data]
    db.add_all(users)
    await db.flush()
    return len(users)

async def bulk_insert_chunked(
    db: AsyncSession, items: list[dict], chunk_size: int = 1000,
) -> int:
    """Insert large datasets in chunks to manage memory."""
    total = 0
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        db.add_all([Item(**data) for data in chunk])
        await db.flush()
        total += len(chunk)
    return total

Key Principles

  • Use flush() for ID generation without committing the transaction
  • Chunk bulk inserts at 1000-10000 rows for memory management
  • One repository per aggregate root
  • Transaction boundary at the service layer, not repository

Incorrect — Adding entities one-by-one in loop causes N round trips:

async def create_users(db: AsyncSession, users_data: list[dict]):
    for user_data in users_data:  # 1000 loop iterations
        user = User(**user_data)
        db.add(user)
        await db.flush()  # 1000 round trips to DB!

Correct — Bulk operations reduce round trips and improve performance:

async def create_users(db: AsyncSession, users_data: list[dict]):
    users = [User(**data) for data in users_data]
    db.add_all(users)  # Single round trip
    await db.flush()

Configure eager loading for SQLAlchemy relationships to prevent N+1 query performance problems — HIGH

Relationships & Eager Loading

Eager Loading (Avoid N+1)

from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select

async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
    """Load user with orders in single query - NO N+1."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
    """Load multiple users with orders efficiently."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .limit(limit)
    )
    return list(result.scalars().all())

Concurrent Queries (Session Safety)

async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
    """Sequential queries with same session (safe)."""
    # WRONG: Don't share AsyncSession across tasks
    # async with asyncio.TaskGroup() as tg:
    #     tg.create_task(db.execute(...))  # NOT SAFE

    # CORRECT: Sequential queries with same session
    user = await db.get(User, user_id)
    orders_result = await db.execute(
        select(Order).where(Order.user_id == user_id).limit(10)
    )
    return {"user": user, "recent_orders": list(orders_result.scalars().all())}

async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
    """Concurrent queries - each task gets its own session."""
    async def fetch_user(user_id: UUID) -> dict:
        async with async_session_factory() as session:
            user = await session.get(User, user_id)
            return {"id": user_id, "email": user.email if user else None}

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
    return [t.result() for t in tasks]

Key Decisions

DecisionRecommendationRationale
Lazy loadinglazy="raise" + explicit loadsPrevents accidental N+1
Eager loadingselectinload for collectionsBetter than joinedload for async
Concurrent queriesSeparate sessions per taskAsyncSession is NOT thread-safe

Incorrect — Lazy loading causes N+1 query problem (1 + 100 queries):

users = await db.execute(select(User).limit(100))
for user in users.scalars():
    print(user.orders)  # Separate query for EACH user's orders!
# Total queries: 1 (users) + 100 (orders) = 101 queries

Correct — Eager loading with selectinload fetches all data in 2 queries:

users = await db.execute(
    select(User).options(selectinload(User.orders)).limit(100)
)
for user in users.scalars():
    print(user.orders)  # Already loaded, no extra query
# Total queries: 2 (users + orders in batch)

Configure SQLAlchemy sessions and engines correctly to prevent connection leaks and lazy load errors — HIGH

SQLAlchemy Sessions & Models

Engine and Session Factory

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

# Create async engine - ONE per application
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=False,
)

# Session factory
async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Prevent lazy load issues
    autoflush=False,
)

FastAPI Dependency

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)) -> UserResponse:
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(404, "User not found")
    return UserResponse.model_validate(user)

Model Definition

from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))

    orders: Mapped[list["Order"]] = relationship(
        back_populates="user",
        lazy="raise",  # Prevent accidental lazy loads
    )

Key Principles

  • One create_async_engine() per application
  • Set expire_on_commit=False to prevent lazy load errors after commit
  • Set pool_pre_ping=True for production connection validation
  • Use lazy="raise" on all relationships

Incorrect — expire_on_commit=True causes lazy load errors after commit:

async_session_factory = async_sessionmaker(
    engine, expire_on_commit=True  # Default, causes issues
)
user = await session.execute(select(User).where(User.id == user_id))
await session.commit()
print(user.email)  # ERROR: Instance is not bound to a Session

Correct — expire_on_commit=False allows access after commit:

async_session_factory = async_sessionmaker(
    engine, expire_on_commit=False  # Prevents lazy load errors
)
user = await session.execute(select(User).where(User.id == user_id))
await session.commit()
print(user.email)  # Works - object still accessible

References (6)

Eager Loading

Eager Loading Patterns for Async SQLAlchemy

The N+1 Problem in Async

# BAD: N+1 queries - one for users, N for orders
async def get_users_bad(db: AsyncSession) -> list[User]:
    result = await db.execute(select(User))
    users = result.scalars().all()
    for user in users:
        # This triggers N additional queries (or raises if lazy="raise")
        print(user.orders)
    return users

# GOOD: Single query with eager loading
async def get_users_good(db: AsyncSession) -> list[User]:
    result = await db.execute(
        select(User).options(selectinload(User.orders))
    )
    users = result.scalars().all()
    for user in users:
        print(user.orders)  # Already loaded
    return users

Loading Strategies

from sqlalchemy.orm import selectinload

# Loads orders in separate SELECT ... WHERE user_id IN (...)
result = await db.execute(
    select(User)
    .options(selectinload(User.orders))
    .limit(100)
)

joinedload (Best for Single Relations)

from sqlalchemy.orm import joinedload

# Uses LEFT JOIN - good for to-one relationships
result = await db.execute(
    select(Order)
    .options(joinedload(Order.user))
    .where(Order.status == "pending")
)

Nested Eager Loading

# Load user -> orders -> order_items
result = await db.execute(
    select(User)
    .options(
        selectinload(User.orders).selectinload(Order.items)
    )
)

# Load user -> orders and user -> addresses
result = await db.execute(
    select(User)
    .options(
        selectinload(User.orders),
        selectinload(User.addresses),
    )
)

Configuring Models to Prevent Lazy Load

from sqlalchemy.orm import relationship, Mapped

class User(Base):
    __tablename__ = "users"

    id: Mapped[UUID] = mapped_column(primary_key=True)

    # lazy="raise" prevents accidental lazy loading
    # Forces explicit eager loading
    orders: Mapped[list["Order"]] = relationship(
        back_populates="user",
        lazy="raise",  # Raises if accessed without eager load
    )

    # For optional relationships you might want loaded
    profile: Mapped["Profile"] = relationship(
        lazy="joined",  # Always joined (use sparingly)
    )

Strategy Comparison

StrategySQLBest ForAsync Safe
selectinloadSeparate IN queryCollectionsYes
joinedloadLEFT JOINSingle/to-oneYes
subqueryloadSubqueryLarge collectionsYes
lazy="select"On accessNever in asyncNo
lazy="raise"Raises errorForcing explicitYes

Dynamic Loading for Large Collections

class User(Base):
    # For very large collections, use dynamic loading
    orders: Mapped[list["Order"]] = relationship(
        lazy="dynamic",  # Returns query, not collection
    )

# Usage
async def get_recent_orders(db: AsyncSession, user_id: UUID) -> list[Order]:
    user = await db.get(User, user_id)
    # Dynamic relationship returns a query
    result = await db.execute(
        user.orders.limit(10).order_by(Order.created_at.desc())
    )
    return list(result.scalars().all())

Fastapi Integration

FastAPI + SQLAlchemy 2.0 Async Integration

Complete Setup

# app/db/session.py
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)
from app.core.config import settings

engine = create_async_engine(
    settings.DATABASE_URL,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=settings.DEBUG,
)

async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
)

Dependency Injection

# app/api/deps.py
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_factory

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Provide database session with automatic cleanup."""
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Route with Database Access

# app/api/v1/routes/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import get_db
from app.models.user import User
from app.schemas.user import UserResponse, UserCreate

router = APIRouter(prefix="/users", tags=["users"])

@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: UUID,
    db: AsyncSession = Depends(get_db),
) -> UserResponse:
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status.HTTP_404_NOT_FOUND, "User not found")
    return UserResponse.model_validate(user)

@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user_in: UserCreate,
    db: AsyncSession = Depends(get_db),
) -> UserResponse:
    user = User(**user_in.model_dump())
    db.add(user)
    await db.flush()  # Get ID without committing
    await db.refresh(user)  # Load any defaults
    return UserResponse.model_validate(user)

Service Layer Pattern

# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate

class UserService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get(self, user_id: UUID) -> User | None:
        return await self.db.get(User, user_id)

    async def get_by_email(self, email: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def create(self, user_in: UserCreate) -> User:
        user = User(**user_in.model_dump())
        self.db.add(user)
        await self.db.flush()
        return user

    async def update(self, user: User, user_in: UserUpdate) -> User:
        for field, value in user_in.model_dump(exclude_unset=True).items():
            setattr(user, field, value)
        await self.db.flush()
        return user

# Usage in route
@router.post("/")
async def create_user(
    user_in: UserCreate,
    db: AsyncSession = Depends(get_db),
):
    service = UserService(db)
    if await service.get_by_email(user_in.email):
        raise HTTPException(400, "Email already registered")
    return await service.create(user_in)

Transaction Management

# Explicit transaction control
@router.post("/transfer")
async def transfer_funds(
    transfer: TransferRequest,
    db: AsyncSession = Depends(get_db),
):
    async with db.begin():  # Explicit transaction
        from_account = await db.get(Account, transfer.from_id, with_for_update=True)
        to_account = await db.get(Account, transfer.to_id, with_for_update=True)

        if from_account.balance < transfer.amount:
            raise HTTPException(400, "Insufficient funds")

        from_account.balance -= transfer.amount
        to_account.balance += transfer.amount
        # Commits automatically on exit, rolls back on exception

Lifespan with Database

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.db.session import engine

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: verify database connection
    async with engine.begin() as conn:
        await conn.execute(text("SELECT 1"))

    yield

    # Shutdown: dispose engine
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

Middleware Stack

FastAPI Middleware Stack

Complete guide to middleware ordering and implementation in FastAPI.

Middleware Execution Order

REQUEST                                              RESPONSE
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  1. CORS Middleware (outermost)                              │
│     - Handles preflight requests                             │
│     - Adds CORS headers to response                          │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  2. Request ID Middleware                                    │
│     - Generates/extracts request ID                          │
│     - Adds to response headers                               │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  3. Timing Middleware                                        │
│     - Records start time                                     │
│     - Calculates duration                                    │
│     - Adds X-Response-Time header                            │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  4. Logging Middleware                                       │
│     - Logs request details                                   │
│     - Logs response status                                   │
│     - Uses request ID for correlation                        │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  5. Authentication Middleware (optional)                     │
│     - Validates JWT/API key                                  │
│     - Sets request.state.user                                │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│  6. Rate Limit Middleware                                    │
│     - Checks rate limits                                     │
│     - Returns 429 if exceeded                                │
│     - Adds rate limit headers                                │
└──────────────────────────────────────────────────────────────┘
   │                                                    ▲
   ▼                                                    │
┌──────────────────────────────────────────────────────────────┐
│                    ROUTE HANDLER                             │
│               (Your endpoint code)                           │
└──────────────────────────────────────────────────────────────┘

Note: Middleware added LAST executes FIRST (wraps outer).

Middleware Registration Order

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# 6. Rate Limit (added last, runs closest to route)
app.add_middleware(RateLimitMiddleware)

# 5. Authentication (optional)
app.add_middleware(AuthMiddleware)

# 4. Logging
app.add_middleware(LoggingMiddleware)

# 3. Timing
app.add_middleware(TimingMiddleware)

# 2. Request ID
app.add_middleware(RequestIDMiddleware)

# 1. CORS (added first, runs first/last)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Core Middleware Implementations

Request ID Middleware

import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    """Add unique request ID to each request."""

    async def dispatch(self, request: Request, call_next):
        # Get from header or generate new
        request_id = request.headers.get(
            "X-Request-ID",
            str(uuid.uuid4()),
        )

        # Store in request state
        request.state.request_id = request_id

        # Call next middleware/route
        response = await call_next(request)

        # Add to response headers
        response.headers["X-Request-ID"] = request_id

        return response

Timing Middleware

import time
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class TimingMiddleware(BaseHTTPMiddleware):
    """Track request processing time."""

    async def dispatch(self, request: Request, call_next):
        start_time = time.perf_counter()

        response = await call_next(request)

        duration = time.perf_counter() - start_time
        response.headers["X-Response-Time"] = f"{duration:.4f}s"

        # Store for logging middleware
        request.state.duration = duration

        return response

Logging Middleware

import structlog
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    """Structured logging for all requests."""

    async def dispatch(self, request: Request, call_next):
        # Bind request context
        log = logger.bind(
            request_id=getattr(request.state, "request_id", None),
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host if request.client else None,
        )

        try:
            response = await call_next(request)

            log.info(
                "request_completed",
                status_code=response.status_code,
                duration=getattr(request.state, "duration", None),
            )

            return response

        except Exception as exc:
            log.exception("request_failed", error=str(exc))
            raise

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    # Production: specify exact origins
    allow_origins=[
        "https://app.example.com",
        "https://admin.example.com",
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    expose_headers=[
        "X-Request-ID",
        "X-Response-Time",
        "X-RateLimit-Limit",
        "X-RateLimit-Remaining",
    ],
    max_age=600,  # Preflight cache 10 minutes
)

Advanced Patterns

Conditional Middleware

class ConditionalMiddleware(BaseHTTPMiddleware):
    """Middleware that only applies to certain paths."""

    def __init__(self, app, paths: list[str] = None, exclude_paths: list[str] = None):
        super().__init__(app)
        self.paths = paths or []
        self.exclude_paths = exclude_paths or []

    async def dispatch(self, request: Request, call_next):
        path = request.url.path

        # Skip excluded paths
        if any(path.startswith(p) for p in self.exclude_paths):
            return await call_next(request)

        # Only apply to specific paths if defined
        if self.paths and not any(path.startswith(p) for p in self.paths):
            return await call_next(request)

        # Apply middleware logic
        return await self._apply_middleware(request, call_next)

    async def _apply_middleware(self, request: Request, call_next):
        # Your middleware logic here
        return await call_next(request)

Error Handling Middleware

from fastapi.responses import JSONResponse

class ErrorHandlingMiddleware(BaseHTTPMiddleware):
    """Catch unhandled exceptions and return proper responses."""

    async def dispatch(self, request: Request, call_next):
        try:
            return await call_next(request)

        except Exception as exc:
            logger.exception(
                "unhandled_exception",
                request_id=getattr(request.state, "request_id", None),
                path=request.url.path,
            )

            return JSONResponse(
                status_code=500,
                content={
                    "type": "https://api.example.com/problems/internal-error",
                    "title": "Internal Server Error",
                    "status": 500,
                    "detail": "An unexpected error occurred",
                    "request_id": getattr(request.state, "request_id", None),
                },
                media_type="application/problem+json",
            )

Request Body Caching

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class BodyCacheMiddleware(BaseHTTPMiddleware):
    """Cache request body for multiple reads."""

    async def dispatch(self, request: Request, call_next):
        # Only cache for methods with body
        if request.method in ("POST", "PUT", "PATCH"):
            body = await request.body()
            request.state.body = body

            # Create new receive that returns cached body
            async def receive():
                return {"type": "http.request", "body": body}

            request._receive = receive

        return await call_next(request)

Performance Considerations

Async vs Sync Middleware

# GOOD: Async middleware (non-blocking)
class AsyncMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        await asyncio.sleep(0)  # Async operation
        return await call_next(request)

# BAD: Sync operations in async middleware
class BadMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        time.sleep(1)  # BLOCKS EVENT LOOP!
        return await call_next(request)

Middleware vs Dependencies

MiddlewareDependencies
Runs on ALL requestsRuns on specific routes
No access to path paramsAccess to path params
Before route matchingAfter route matching
For cross-cutting concernsFor route-specific logic

Use Middleware for:

  • Request ID generation
  • Logging
  • CORS
  • Timing

Use Dependencies for:

  • Authentication
  • Rate limiting per endpoint
  • Request validation
  • Database sessions

Testing Middleware

# tests/test_middleware.py
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_request_id_generated(client: AsyncClient):
    response = await client.get("/health")

    assert "X-Request-ID" in response.headers
    # Should be valid UUID
    import uuid
    uuid.UUID(response.headers["X-Request-ID"])

@pytest.mark.asyncio
async def test_request_id_preserved(client: AsyncClient):
    custom_id = "my-custom-id-123"
    response = await client.get(
        "/health",
        headers={"X-Request-ID": custom_id},
    )

    assert response.headers["X-Request-ID"] == custom_id

@pytest.mark.asyncio
async def test_timing_header_present(client: AsyncClient):
    response = await client.get("/health")

    assert "X-Response-Time" in response.headers
    # Should be a valid duration
    duration = float(response.headers["X-Response-Time"].rstrip("s"))
    assert duration > 0
  • See examples/fastapi-middleware.md for complete examples
  • See scripts/middleware-stack.py for copy-paste template
  • See SKILL.md for lifespan and dependencies

Pool Sizing

Connection Pool Sizing Guide

Database Pool Sizing

Formula

pool_size = (requests_per_second * avg_query_duration_seconds) * safety_factor

Where:
- requests_per_second: Peak RPS to handle
- avg_query_duration_seconds: Average time a connection is held
- safety_factor: 1.5 - 2.0 for headroom

Examples

ScenarioRPSQuery DurationPool Size
Low traffic API1050ms5
Medium service10050ms10
High traffic API50050ms50
Slow queries100200ms40
Batch processing50500ms50

PostgreSQL Connection Limits

-- Check max connections
SHOW max_connections;  -- Default: 100

-- Check current connections
SELECT count(*) FROM pg_stat_activity;

-- Check per-database limit
SELECT datname, numbackends FROM pg_stat_database;

Important: Total connections across all app instances must not exceed max_connections.

Total pool size = pool_size * num_instances

Example:
- max_connections = 100
- Reserve 10 for admin
- 90 available for apps
- 3 app instances
- pool_size per instance = 30

max_overflow Setting

max_overflow allows temporary connections above pool_size.

engine = create_async_engine(
    url,
    pool_size=20,       # Normal capacity
    max_overflow=10,    # Burst capacity (total max = 30)
)

Guidelines

Traffic Patternmax_overflow
Steady load0-25% of pool_size
Bursty traffic50-100% of pool_size
Unpredictable100% of pool_size

HTTP Connection Pool Sizing

aiohttp TCPConnector

connector = TCPConnector(
    limit=100,          # Total connections
    limit_per_host=20,  # Per-host limit
)

Guidelines

limit = num_external_services * connections_per_service * safety_factor

limit_per_host = concurrent_requests_to_host * 1.5

Example

External ServiceConcurrent CallsConnections
Payment API1015
Email service58
Analytics2030
Total3553

Set limit=60 (with headroom).

Monitoring Pool Health

Key Metrics

# SQLAlchemy
pool = engine.pool
print(f"Size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")

# asyncpg
pool = await asyncpg.create_pool(...)
print(f"Size: {pool.get_size()}")
print(f"Free: {pool.get_idle_size()}")
print(f"Min: {pool.get_min_size()}")
print(f"Max: {pool.get_max_size()}")

Alert Thresholds

MetricWarningCritical
Utilization> 70%> 90%
Wait time> 100ms> 1s
Overflow usage> 50%> 80%
Failed checkouts> 1/min> 10/min

Semaphore Patterns

Semaphore Patterns for Concurrency Limiting

Basic Rate Limiting

import asyncio
import aiohttp

class RateLimitedClient:
    """HTTP client with concurrency and rate limiting."""

    def __init__(
        self,
        max_concurrent: int = 10,
        requests_per_second: float = 100,
    ):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._rate_limiter = AsyncRateLimiter(requests_per_second)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self):
        self._session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()

    async def get(self, url: str) -> dict:
        async with self._semaphore:
            await self._rate_limiter.acquire()
            async with self._session.get(url) as resp:
                return await resp.json()


class AsyncRateLimiter:
    """Token bucket rate limiter."""

    def __init__(self, rate: float):
        self._rate = rate
        self._tokens = rate
        self._last_update = asyncio.get_event_loop().time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = asyncio.get_event_loop().time()
            self._tokens = min(
                self._rate,
                self._tokens + (now - self._last_update) * self._rate
            )
            self._last_update = now

            if self._tokens < 1:
                wait_time = (1 - self._tokens) / self._rate
                await asyncio.sleep(wait_time)
                self._tokens = 0
            else:
                self._tokens -= 1

Database Connection Limiting

class DatabasePool:
    """Async database pool with connection limiting."""

    def __init__(self, dsn: str, max_connections: int = 20):
        self._dsn = dsn
        self._semaphore = asyncio.Semaphore(max_connections)
        self._pool = None

    async def execute(self, query: str, *args) -> list:
        async with self._semaphore:
            async with self._pool.acquire() as conn:
                return await conn.fetch(query, *args)

    async def execute_many(self, queries: list[tuple[str, tuple]]) -> list:
        """Execute multiple queries with connection limiting."""
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(self.execute(q, *args))
                for q, args in queries
            ]
        return [t.result() for t in tasks]

Bounded Work Queue

class BoundedWorkQueue:
    """Process items with bounded concurrency."""

    def __init__(self, max_workers: int = 10):
        self._semaphore = asyncio.Semaphore(max_workers)
        self._results: list = []

    async def process_all(
        self,
        items: list,
        processor: Callable[[Any], Awaitable[Any]],
    ) -> list:
        async def bounded_process(item):
            async with self._semaphore:
                return await processor(item)

        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(bounded_process(item)) for item in items]

        return [t.result() for t in tasks]

Common Pitfalls

# WRONG: Creating semaphore inside coroutine
async def bad_fetch(url: str):
    sem = asyncio.Semaphore(10)  # New semaphore each call!
    async with sem:
        return await fetch(url)

# CORRECT: Share semaphore across calls
SEM = asyncio.Semaphore(10)

async def good_fetch(url: str):
    async with SEM:
        return await fetch(url)

# WRONG: Semaphore without timeout
async with sem:
    await potentially_slow_operation()  # Can block other tasks indefinitely

# CORRECT: Semaphore with timeout
async with asyncio.timeout(30):
    async with sem:
        await potentially_slow_operation()

Taskgroup Patterns

TaskGroup Patterns

Basic TaskGroup Usage

import asyncio
from typing import TypeVar

T = TypeVar("T")

async def fetch_all_concurrent(tasks: list[Coroutine[Any, Any, T]]) -> list[T]:
    """Run all tasks concurrently, fail-fast on any exception."""
    async with asyncio.TaskGroup() as tg:
        created = [tg.create_task(task) for task in tasks]
    return [t.result() for t in created]

TaskGroup with Partial Failure Handling

async def fetch_with_partial_failures(urls: list[str]) -> tuple[list[dict], list[str]]:
    """Collect successes and failures separately."""
    successes = []
    failures = []

    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [(url, tg.create_task(fetch(url))) for url in urls]
    except* Exception as eg:
        # TaskGroup failed - collect individual results
        for url, task in tasks:
            if task.done():
                try:
                    successes.append(task.result())
                except Exception:
                    failures.append(url)
            else:
                failures.append(url)
    else:
        successes = [t.result() for _, t in tasks]

    return successes, failures

TaskGroup with Timeout per Task

async def fetch_with_individual_timeouts(
    items: list[dict],
    timeout_per_item: float = 5.0,
) -> list[dict | None]:
    """Each task has its own timeout."""
    async def fetch_with_timeout(item: dict) -> dict | None:
        try:
            async with asyncio.timeout(timeout_per_item):
                return await process_item(item)
        except asyncio.TimeoutError:
            return None

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_with_timeout(item)) for item in items]

    return [t.result() for t in tasks]

TaskGroup vs gather Comparison

FeatureTaskGroupgather
CancellationAutomatic on first failureManual with return_exceptions
Exception handlingExceptionGroupList or raises first
Structured concurrencyYesNo
Task cleanupGuaranteedManual
Python version3.11+3.4+

When to Still Use gather

# Only for Python 3.10 compatibility or return_exceptions pattern
results = await asyncio.gather(
    *tasks,
    return_exceptions=True  # Collect all, don't fail fast
)

# Filter successes and failures
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]

Checklists (4)

Async Implementation Checklist

Async Implementation Checklist

Before Starting

  • Python version >= 3.11 (for TaskGroup, ExceptionGroup)
  • Using async-compatible libraries (aiohttp, asyncpg, aiofiles)
  • No blocking sync calls in async code paths

TaskGroup Usage

  • Using async with asyncio.TaskGroup() instead of asyncio.gather()
  • All tasks created with tg.create_task()
  • Handling ExceptionGroup with except* syntax
  • No fire-and-forget create_task() outside TaskGroup

Timeout Handling

  • Using async with asyncio.timeout(seconds) for deadlines
  • Timeout values are reasonable for the operation
  • Timeout exceptions are caught and handled appropriately

Cancellation Safety

  • Never swallowing asyncio.CancelledError
  • Always re-raising CancelledError after cleanup
  • Resources cleaned up in finally blocks
  • No yield inside async with timeout/taskgroup contexts

Concurrency Limiting

  • Using asyncio.Semaphore for rate limiting
  • Semaphore created once, not per-call
  • Semaphore combined with timeout to prevent deadlock
  • Max concurrency matches resource limits (connections, API rate)

Sync-to-Async Bridge

  • Using asyncio.to_thread() for blocking sync code
  • Not calling asyncio.run() inside async context
  • Thread pool sized appropriately for workload
  • CPU-bound work offloaded to process pool or worker

Testing

  • Using pytest-asyncio for async tests
  • Tests use @pytest.mark.asyncio decorator
  • Mock async functions return coroutines or use AsyncMock
  • Timeouts added to prevent hanging tests

Performance

  • Connection pools are shared (not created per-request)
  • HTTP sessions reused across requests
  • Batch operations where possible
  • Monitoring for event loop blocking (> 100ms)

Connection Pool Checklist

Connection Pool Checklist

Database Pool Configuration

  • Using create_async_engine (not sync engine)
  • pool_size matches expected concurrent load
  • max_overflow allows for burst traffic
  • pool_pre_ping=True for connection validation
  • pool_recycle set to prevent stale connections
  • pool_timeout set with reasonable wait time

Connection Health

  • Query timeouts configured (statement_timeout)
  • Connection timeouts configured (connect_timeout)
  • Dead connection detection enabled
  • Automatic reconnection on failure

HTTP Session Configuration

  • aiohttp.ClientSession created once, reused
  • TCPConnector with appropriate limits
  • Per-host connection limit set
  • DNS cache TTL configured
  • SSL context configured if needed

Lifecycle Management

  • Pools created at application startup
  • Pools closed at application shutdown
  • Graceful shutdown waits for active connections
  • Context managers used for connection checkout

Monitoring

  • Pool size metrics exposed
  • Available connections tracked
  • Wait time for connections monitored
  • Connection errors logged
  • Alerts for pool exhaustion

Testing

  • Pool behavior tested under load
  • Connection failure scenarios tested
  • Timeout handling verified
  • Pool recovery after failure tested

Performance

  • Pool sizing validated with load tests
  • Connection reuse verified (no per-request pools)
  • Latency impact of pool_pre_ping measured
  • max_overflow tuned for burst patterns

Fastapi Production Checklist

FastAPI Production Checklist

Application Setup

Lifespan Management

  • Use asynccontextmanager lifespan (not deprecated events)

    @asynccontextmanager
    async def lifespan(app: FastAPI):
        # startup
        yield
        # shutdown
  • Initialize all connections in lifespan:

    • Database engine with connection pool
    • Redis client
    • Task queue (ARQ/Celery)
    • LLM clients
  • Cleanup all resources on shutdown (reverse order)

  • Verify connections on startup (ping/SELECT 1)

  • Handle graceful shutdown with active connections

Configuration

  • Use Pydantic Settings for configuration:

    class Settings(BaseSettings):
        model_config = SettingsConfigDict(env_file=".env")
  • Validate settings on startup

  • Use @lru_cache for settings singleton

  • Don't hardcode secrets

Response Class

  • Use ORJSONResponse for better performance:
    app = FastAPI(default_response_class=ORJSONResponse)

Middleware Stack

Order (add in reverse)

  • Rate Limiting (innermost)
  • Authentication
  • Logging
  • Timing
  • Request ID
  • CORS (outermost)

Required Middleware

  • CORS: Configure for your domains

    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
  • Request ID: Generate unique ID per request

  • Timing: Track response time

  • Logging: Structured request logging

Dependency Injection

Database Session

  • Use async session with proper transaction handling:
    async def get_db(request: Request):
        async with AsyncSession(request.app.state.db_engine) as session:
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                raise

Service Dependencies

  • Inject dependencies, don't instantiate in routes
  • Use factories for complex dependencies
  • Consider dependency caching for expensive operations

Error Handling

Exception Handlers

  • Register handler for custom exceptions
  • Register handler for validation errors
  • Register handler for database errors
  • Register catch-all for unexpected errors

RFC 9457 Problem Details

  • Return application/problem+json for errors
  • Include required fields: type, status
  • Include trace ID in error responses
  • Don't leak internal details in production

Security

Authentication

  • Use HTTPBearer for JWT:
    security = HTTPBearer()
  • Validate tokens in dependency
  • Set appropriate token expiry (15min access, 7d refresh)
  • Use bcrypt for password hashing (cost >= 12)

Input Validation

  • Use Pydantic models for all request bodies
  • Validate path/query parameters
  • Sanitize user input
  • Use Field() constraints

Headers

  • Add security headers:
    • X-Content-Type-Options: nosniff
    • X-Frame-Options: DENY
    • Strict-Transport-Security
  • Use HTTPS in production

Performance

Async Best Practices

  • Use async database driver (asyncpg)
  • Use async Redis client
  • Don't block event loop with sync operations
  • Use run_in_executor for blocking I/O if needed

Connection Pooling

  • Configure database pool size:

    create_async_engine(
        url,
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True,
    )
  • Configure Redis max connections

Caching

  • Cache expensive computations
  • Use proper TTLs
  • Implement cache invalidation

Observability

Logging

  • Use structured logging (structlog)
  • Include request ID in all logs
  • Log at appropriate levels
  • Don't log sensitive data

Metrics

  • Track request latency
  • Track error rates
  • Track cache hit rates
  • Track queue depth

Health Checks

  • Implement /health endpoint
  • Check all dependencies
  • Return proper status codes:
    • 200 for healthy
    • 503 for unhealthy

Documentation

OpenAPI

  • Add descriptions to all routes
  • Document all response codes
  • Include request/response examples
  • Tag routes appropriately

API Info

  • Set app title and description
  • Set version
  • Configure docs URL

Testing

Test Configuration

  • Use test database
  • Mock external services
  • Use pytest-asyncio

Test Coverage

  • Unit tests for business logic
  • Integration tests for routes
  • Test error handling
  • Test authentication

Deployment

Docker

  • Multi-stage build
  • Non-root user
  • Health check in Dockerfile

Kubernetes

  • Readiness probe
  • Liveness probe
  • Resource limits
  • Horizontal pod autoscaler

Environment Variables

  • All secrets from environment
  • Different configs per environment
  • Validate required variables on startup

Quick Reference

ConcernSolution
Startup/Shutdownasynccontextmanager lifespan
ConfigPydantic Settings
DB SessionDependency with context manager
AuthHTTPBearer + JWT validation
ErrorsRFC 9457 Problem Details
LoggingStructlog with request ID
ResponseORJSONResponse
MiddlewareCORS → RequestID → Timing → Logging → Auth → RateLimit

Sqlalchemy Async Checklist

SQLAlchemy 2.0 Async Checklist

Engine Configuration

  • Using create_async_engine (not create_engine)
  • Connection string uses async driver: postgresql+asyncpg://
  • pool_pre_ping=True enabled for connection validation
  • pool_size and max_overflow set appropriately
  • pool_recycle set to prevent stale connections (e.g., 3600)

Session Factory

  • Using async_sessionmaker (not sessionmaker)
  • expire_on_commit=False to prevent lazy load issues
  • autoflush=False for explicit control (optional)
  • Single factory instance shared across application

FastAPI Integration

  • Database dependency uses async with context manager
  • Session yielded to routes, not returned
  • Commit on success, rollback on exception
  • Session properly closed after request

Model Definition

  • Using Mapped[] type hints (SQLAlchemy 2.0 style)
  • mapped_column() instead of Column()
  • Relationships have explicit lazy= parameter
  • lazy="raise" to prevent accidental lazy loads

Eager Loading

  • Using selectinload() for collections
  • Using joinedload() for single relationships
  • All needed relationships loaded in query
  • No N+1 queries in response serialization

Bulk Operations

  • Using add_all() for multiple inserts
  • Chunking large inserts (1000-10000 per batch)
  • Using flush() between chunks for memory
  • Batch size tuned for performance

Concurrency

  • One AsyncSession per task/request (never shared)
  • Not using scoped_session with async
  • Concurrent queries use separate sessions
  • Connection pool sized for concurrent load

Error Handling

  • Proper exception handling around DB operations
  • Rollback on errors before re-raising
  • Connection errors handled gracefully
  • Retry logic for transient failures

Testing

  • Using test database (not production)
  • Transactions rolled back after each test
  • Async test fixtures with pytest-asyncio
  • Database state isolated between tests

Performance

  • Indexes on frequently queried columns
  • EXPLAIN ANALYZE run on slow queries
  • Connection pool metrics monitored
  • Query execution time logged

Examples (4)

Asyncio Examples

Asyncio Advanced Examples

Example 1: Concurrent API Fetching with TaskGroup

import asyncio
import aiohttp
from dataclasses import dataclass

@dataclass
class UserData:
    profile: dict
    orders: list
    preferences: dict

async def fetch_user_dashboard(user_id: str) -> UserData:
    """Fetch all user data concurrently with proper error handling."""
    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as tg:
            profile_task = tg.create_task(
                fetch_json(session, f"/api/users/{user_id}")
            )
            orders_task = tg.create_task(
                fetch_json(session, f"/api/users/{user_id}/orders")
            )
            prefs_task = tg.create_task(
                fetch_json(session, f"/api/users/{user_id}/preferences")
            )

        return UserData(
            profile=profile_task.result(),
            orders=orders_task.result(),
            preferences=prefs_task.result(),
        )

async def fetch_json(session: aiohttp.ClientSession, path: str) -> dict:
    async with session.get(f"https://api.example.com{path}") as resp:
        return await resp.json()

Example 2: Rate-Limited Bulk Processing

import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")
R = TypeVar("R")

class BulkProcessor:
    """Process items with concurrency and rate limiting."""

    def __init__(self, max_concurrent: int = 10, timeout: float = 30.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.timeout = timeout

    async def process_all(
        self,
        items: list[T],
        processor: Callable[[T], Awaitable[R]],
    ) -> tuple[list[R], list[tuple[T, Exception]]]:
        """Process all items, returning successes and failures."""
        successes: list[R] = []
        failures: list[tuple[T, Exception]] = []

        async def process_one(item: T) -> tuple[T, R | Exception]:
            async with self.semaphore:
                try:
                    async with asyncio.timeout(self.timeout):
                        result = await processor(item)
                        return (item, result)
                except Exception as e:
                    return (item, e)

        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(process_one(item)) for item in items]

        for task in tasks:
            item, result = task.result()
            if isinstance(result, Exception):
                failures.append((item, result))
            else:
                successes.append(result)

        return successes, failures

# Usage
processor = BulkProcessor(max_concurrent=5, timeout=10.0)
successes, failures = await processor.process_all(
    items=urls,
    processor=fetch_and_parse,
)
print(f"Processed {len(successes)}, failed {len(failures)}")

Example 3: Graceful Shutdown with Cleanup

import asyncio
import signal
from contextlib import asynccontextmanager

class AsyncService:
    def __init__(self):
        self._shutdown_event = asyncio.Event()
        self._tasks: set[asyncio.Task] = set()

    async def start(self):
        """Start service with graceful shutdown handling."""
        loop = asyncio.get_running_loop()

        # Handle signals
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda: asyncio.create_task(self.shutdown())
            )

        try:
            async with asyncio.TaskGroup() as tg:
                tg.create_task(self.worker_loop())
                tg.create_task(self.health_check_loop())
                tg.create_task(self._wait_for_shutdown())
        except* asyncio.CancelledError:
            pass  # Expected on shutdown

    async def _wait_for_shutdown(self):
        await self._shutdown_event.wait()
        raise asyncio.CancelledError()

    async def shutdown(self):
        """Graceful shutdown - complete current work."""
        print("Shutting down...")
        self._shutdown_event.set()

    async def worker_loop(self):
        while not self._shutdown_event.is_set():
            try:
                async with asyncio.timeout(5.0):
                    await self.process_next_job()
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                print("Worker cancelled, finishing current job...")
                raise

    async def health_check_loop(self):
        while not self._shutdown_event.is_set():
            await asyncio.sleep(30)
            await self.health_check()

Example 4: Exception Group Handling

import asyncio

async def fetch_from_multiple_sources(query: str) -> list[dict]:
    """Try multiple sources, collect partial results on failures."""
    sources = ["source_a", "source_b", "source_c"]
    results = []
    errors = []

    try:
        async with asyncio.TaskGroup() as tg:
            tasks = {
                source: tg.create_task(fetch_from_source(source, query))
                for source in sources
            }
    except* ConnectionError as eg:
        # Some sources failed with connection errors
        errors.extend(eg.exceptions)
        # Collect successful results
        for source, task in tasks.items():
            if task.done() and not task.exception():
                results.append(task.result())
    except* TimeoutError as eg:
        errors.extend(eg.exceptions)
        for source, task in tasks.items():
            if task.done() and not task.exception():
                results.append(task.result())
    else:
        # All succeeded
        results = [t.result() for t in tasks.values()]

    if errors:
        print(f"Partial results: {len(results)} succeeded, {len(errors)} failed")

    return results

Example 5: Async Context Manager with Resource Pool

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator

class ConnectionPool:
    """Async connection pool with proper lifecycle management."""

    def __init__(self, max_size: int = 10):
        self._semaphore = asyncio.Semaphore(max_size)
        self._connections: asyncio.Queue = asyncio.Queue(maxsize=max_size)
        self._initialized = False

    async def initialize(self, dsn: str):
        """Pre-create connections."""
        for _ in range(self._semaphore._value):
            conn = await create_connection(dsn)
            await self._connections.put(conn)
        self._initialized = True

    @asynccontextmanager
    async def acquire(self) -> AsyncIterator[Connection]:
        """Get a connection from the pool."""
        async with self._semaphore:
            conn = await self._connections.get()
            try:
                yield conn
            finally:
                # Return connection to pool
                if conn.is_healthy():
                    await self._connections.put(conn)
                else:
                    # Replace unhealthy connection
                    new_conn = await create_connection(self._dsn)
                    await self._connections.put(new_conn)

    async def close(self):
        """Close all connections."""
        while not self._connections.empty():
            conn = await self._connections.get()
            await conn.close()

# Usage
pool = ConnectionPool(max_size=20)
await pool.initialize("postgres://...")

async with pool.acquire() as conn:
    result = await conn.execute("SELECT * FROM users")

Connection Pooling Examples

Connection Pooling Examples

SQLAlchemy Async Pool with FastAPI

from contextlib import asynccontextmanager
from typing import AsyncGenerator

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.pool import AsyncAdaptedQueuePool

# Configure engine with production settings
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/db",

    # Pool sizing for medium-load service
    pool_size=20,
    max_overflow=10,

    # Connection health
    pool_pre_ping=True,
    pool_recycle=1800,  # 30 minutes

    # Timeouts
    pool_timeout=30,

    # Use queue pool (default for async)
    poolclass=AsyncAdaptedQueuePool,

    # Connection settings
    connect_args={
        "command_timeout": 60,
        "server_settings": {
            "statement_timeout": "60000",
            "lock_timeout": "10000",
        },
    },

    # Echo SQL in development
    echo=False,
)

async_session_maker = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage connection pool lifecycle."""
    # Startup: pool is lazy-initialized on first use
    yield
    # Shutdown: close all connections
    await engine.dispose()


app = FastAPI(lifespan=lifespan)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dependency for database sessions."""
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    """Connection checked out, used, returned automatically."""
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    return result.scalar_one_or_none()

Direct asyncpg Pool

import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI

pool: asyncpg.Pool | None = None


async def setup_connection(conn: asyncpg.Connection):
    """Called for each new connection in pool."""
    await conn.execute("SET timezone TO 'UTC'")
    await conn.execute("SET statement_timeout TO '60s'")
    await conn.execute("SET lock_timeout TO '10s'")
    # Register custom type codecs
    await conn.set_type_codec(
        'json',
        encoder=json.dumps,
        decoder=json.loads,
        schema='pg_catalog'
    )


@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool

    # Create pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost:5432/db",

        # Pool sizing
        min_size=10,
        max_size=20,

        # Connection lifecycle
        max_inactive_connection_lifetime=300,  # 5 min idle timeout
        max_queries=50000,  # Recreate after N queries

        # Timeouts
        command_timeout=60,
        timeout=30,

        # Setup hook
        setup=setup_connection,
    )

    yield

    # Close pool
    await pool.close()


app = FastAPI(lifespan=lifespan)


@app.get("/data/{id}")
async def get_data(id: int):
    """Acquire connection from pool."""
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM data WHERE id = $1", id
        )
        return dict(row) if row else None


@app.post("/batch")
async def batch_operation(items: list[dict]):
    """Use transaction for batch operations."""
    async with pool.acquire() as conn:
        async with conn.transaction():
            for item in items:
                await conn.execute(
                    "INSERT INTO items (name, value) VALUES ($1, $2)",
                    item["name"], item["value"]
                )
    return {"inserted": len(items)}

aiohttp Session with Connection Pool

import aiohttp
from aiohttp import ClientTimeout, TCPConnector
from contextlib import asynccontextmanager
from fastapi import FastAPI

http_session: aiohttp.ClientSession | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global http_session

    # Configure connector
    connector = TCPConnector(
        # Connection limits
        limit=100,          # Total connections
        limit_per_host=20,  # Per-host limit

        # Keep-alive
        keepalive_timeout=30,
        enable_cleanup_closed=True,

        # DNS caching
        ttl_dns_cache=300,
        use_dns_cache=True,

        # SSL (for production, use proper SSL context)
        ssl=False,
    )

    # Configure timeouts
    timeout = ClientTimeout(
        total=30,       # Total request timeout
        connect=10,     # Connection timeout
        sock_read=20,   # Read timeout
        sock_connect=5, # Socket connect timeout
    )

    # Create session
    http_session = aiohttp.ClientSession(
        connector=connector,
        timeout=timeout,
        headers={"User-Agent": "MyApp/1.0"},
    )

    yield

    # Close session (releases all connections)
    await http_session.close()


app = FastAPI(lifespan=lifespan)


@app.get("/external/{resource}")
async def fetch_external(resource: str):
    """Reuse connection from pool."""
    async with http_session.get(
        f"https://api.example.com/{resource}"
    ) as response:
        return await response.json()


@app.post("/webhook")
async def send_webhook(payload: dict):
    """POST with connection reuse."""
    async with http_session.post(
        "https://hooks.example.com/incoming",
        json=payload,
        headers={"X-Webhook-Token": "secret"},
    ) as response:
        return {"status": response.status}

Pool Health Monitoring

import asyncio
from prometheus_client import Gauge, Counter, Histogram
from sqlalchemy import event, text

# Prometheus metrics
pool_size_gauge = Gauge(
    "db_pool_size", "Current pool size"
)
pool_checked_out = Gauge(
    "db_pool_checked_out", "Connections in use"
)
pool_overflow = Gauge(
    "db_pool_overflow", "Overflow connections"
)
pool_checkout_time = Histogram(
    "db_pool_checkout_seconds",
    "Time to acquire connection",
    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
pool_errors = Counter(
    "db_pool_errors_total",
    "Pool errors",
    ["type"]
)


def setup_pool_monitoring(engine):
    """Register SQLAlchemy pool event listeners."""

    @event.listens_for(engine.sync_engine, "checkout")
    def on_checkout(dbapi_conn, connection_record, connection_proxy):
        connection_record.info["checkout_time"] = asyncio.get_event_loop().time()

    @event.listens_for(engine.sync_engine, "checkin")
    def on_checkin(dbapi_conn, connection_record):
        if "checkout_time" in connection_record.info:
            duration = asyncio.get_event_loop().time() - connection_record.info["checkout_time"]
            pool_checkout_time.observe(duration)

    @event.listens_for(engine.sync_engine, "connect")
    def on_connect(dbapi_conn, connection_record):
        pool_size_gauge.inc()

    @event.listens_for(engine.sync_engine, "close")
    def on_close(dbapi_conn, connection_record):
        pool_size_gauge.dec()


async def collect_pool_metrics(engine):
    """Collect pool metrics periodically."""
    while True:
        pool = engine.pool
        pool_size_gauge.set(pool.size())
        pool_checked_out.set(pool.checkedout())
        pool_overflow.set(pool.overflow())
        await asyncio.sleep(10)


# Health check endpoint
@app.get("/health/db")
async def db_health():
    """Check database connectivity."""
    try:
        async with engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        return {
            "status": "healthy",
            "pool_size": engine.pool.size(),
            "checked_out": engine.pool.checkedout(),
            "overflow": engine.pool.overflow(),
        }
    except Exception as e:
        pool_errors.labels(type="health_check").inc()
        return {"status": "unhealthy", "error": str(e)}, 503

Connection Retry with Backoff

import asyncio
from functools import wraps
from sqlalchemy.exc import OperationalError, DisconnectionError

def with_db_retry(max_retries: int = 3, base_delay: float = 0.1):
    """Decorator for database operations with retry."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except (OperationalError, DisconnectionError) as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        delay = base_delay * (2 ** attempt)
                        await asyncio.sleep(delay)
                        continue
                    raise
            raise last_error
        return wrapper
    return decorator


@with_db_retry(max_retries=3)
async def get_user_with_retry(db: AsyncSession, user_id: int):
    """Database operation with automatic retry on connection errors."""
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    return result.scalar_one_or_none()

Multi-Database Pool Management

from dataclasses import dataclass
from typing import Dict

@dataclass
class DatabaseConfig:
    url: str
    pool_size: int = 10
    max_overflow: int = 5


class MultiDatabaseManager:
    """Manage multiple database connection pools."""

    def __init__(self, configs: Dict[str, DatabaseConfig]):
        self.engines: Dict[str, AsyncEngine] = {}
        self.session_makers: Dict[str, async_sessionmaker] = {}

        for name, config in configs.items():
            engine = create_async_engine(
                config.url,
                pool_size=config.pool_size,
                max_overflow=config.max_overflow,
                pool_pre_ping=True,
            )
            self.engines[name] = engine
            self.session_makers[name] = async_sessionmaker(
                engine,
                class_=AsyncSession,
                expire_on_commit=False,
            )

    def get_session(self, db_name: str) -> AsyncSession:
        """Get session for specific database."""
        return self.session_makers[db_name]()

    async def close_all(self):
        """Close all connection pools."""
        for engine in self.engines.values():
            await engine.dispose()


# Usage
db_manager = MultiDatabaseManager({
    "primary": DatabaseConfig(
        url="postgresql+asyncpg://user:pass@primary:5432/db",
        pool_size=20,
    ),
    "replica": DatabaseConfig(
        url="postgresql+asyncpg://user:pass@replica:5432/db",
        pool_size=30,  # More for read-heavy
    ),
    "analytics": DatabaseConfig(
        url="postgresql+asyncpg://user:pass@analytics:5432/db",
        pool_size=5,  # Less for batch jobs
    ),
})


@app.get("/users/{id}")
async def get_user(id: int):
    """Read from replica."""
    async with db_manager.get_session("replica") as session:
        result = await session.execute(
            select(User).where(User.id == id)
        )
        return result.scalar_one_or_none()


@app.post("/users")
async def create_user(user: UserCreate):
    """Write to primary."""
    async with db_manager.get_session("primary") as session:
        db_user = User(**user.model_dump())
        session.add(db_user)
        await session.commit()
        return db_user

Fastapi Lifespan

FastAPI Lifespan Management

Complete examples for managing application lifecycle in FastAPI.

Basic Lifespan

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Application lifespan context manager.

    Code before yield runs on startup.
    Code after yield runs on shutdown.
    """
    # STARTUP
    print("Application starting...")
    app.state.started_at = datetime.now(timezone.utc)

    yield  # Application runs here

    # SHUTDOWN
    print("Application shutting down...")


app = FastAPI(lifespan=lifespan)

Full Production Lifespan

from contextlib import asynccontextmanager
from datetime import datetime, timezone
import structlog

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
import redis.asyncio as redis

from app.core.config import settings

logger = structlog.get_logger()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Production lifespan with full resource management.

    Initializes:
    - Database connection pool
    - Redis connection
    - Background task queue
    - LLM clients
    """
    logger.info("application_starting", version=settings.app_version)

    # =====================================================================
    # STARTUP
    # =====================================================================

    # 1. Database Engine
    logger.info("initializing_database")
    app.state.db_engine = create_async_engine(
        settings.database_url,
        pool_size=settings.db_pool_size,
        max_overflow=settings.db_max_overflow,
        pool_pre_ping=True,  # Verify connections
        echo=settings.debug,
    )

    # Verify database connection
    async with app.state.db_engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    logger.info("database_connected")

    # 2. Redis
    logger.info("initializing_redis")
    app.state.redis = redis.from_url(
        settings.redis_url,
        encoding="utf-8",
        decode_responses=True,
        max_connections=20,
    )
    await app.state.redis.ping()
    logger.info("redis_connected")

    # 3. Task Queue (ARQ)
    logger.info("initializing_task_queue")
    from arq import create_pool
    from arq.connections import RedisSettings

    app.state.task_queue = await create_pool(
        RedisSettings.from_dsn(settings.redis_url)
    )
    logger.info("task_queue_connected")

    # 4. LLM Clients
    logger.info("initializing_llm_clients")
    from app.services.llm import LLMService

    app.state.llm = LLMService(
        openai_key=settings.openai_api_key,
        anthropic_key=settings.anthropic_api_key,
    )
    logger.info("llm_clients_initialized")

    # 5. Embeddings Service
    logger.info("initializing_embeddings")
    from app.services.embeddings import EmbeddingsService

    app.state.embeddings = EmbeddingsService(
        model=settings.embedding_model,
    )
    # Warmup embedding model
    await app.state.embeddings.embed("warmup")
    logger.info("embeddings_initialized")

    # Record startup time
    app.state.started_at = datetime.now(timezone.utc)
    logger.info("application_started")

    # =====================================================================
    # APPLICATION RUNS HERE
    # =====================================================================
    yield

    # =====================================================================
    # SHUTDOWN
    # =====================================================================
    logger.info("application_stopping")

    # Close in reverse order
    logger.info("closing_embeddings")
    await app.state.embeddings.close()

    logger.info("closing_llm_clients")
    await app.state.llm.close()

    logger.info("closing_task_queue")
    await app.state.task_queue.close()

    logger.info("closing_redis")
    await app.state.redis.close()

    logger.info("closing_database")
    await app.state.db_engine.dispose()

    logger.info("application_stopped")


# Create app with lifespan
app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    lifespan=lifespan,
)

Accessing App State in Routes

from fastapi import APIRouter, Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter()


async def get_db(request: Request) -> AsyncSession:
    """Dependency to get database session."""
    async with AsyncSession(
        request.app.state.db_engine,
        expire_on_commit=False,
    ) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


async def get_redis(request: Request):
    """Dependency to get Redis client."""
    return request.app.state.redis


async def get_task_queue(request: Request):
    """Dependency to get task queue."""
    return request.app.state.task_queue


@router.get("/analyses/{id}")
async def get_analysis(
    id: str,
    db: AsyncSession = Depends(get_db),
    redis=Depends(get_redis),
):
    # Try cache first
    cached = await redis.get(f"analysis:{id}")
    if cached:
        return json.loads(cached)

    # Query database
    analysis = await db.get(AnalysisModel, id)
    return analysis


@router.post("/analyses")
async def create_analysis(
    request: CreateAnalysisRequest,
    db: AsyncSession = Depends(get_db),
    queue=Depends(get_task_queue),
):
    # Create record
    analysis = AnalysisModel(**request.dict())
    db.add(analysis)
    await db.commit()

    # Enqueue background processing
    await queue.enqueue_job("process_analysis", analysis_id=str(analysis.id))

    return analysis

Health Check Using App State

@router.get("/health")
async def health_check(request: Request):
    """Check health of all dependencies."""
    checks = {}

    # Database
    try:
        async with request.app.state.db_engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception as e:
        checks["database"] = f"unhealthy: {e}"

    # Redis
    try:
        await request.app.state.redis.ping()
        checks["redis"] = "healthy"
    except Exception as e:
        checks["redis"] = f"unhealthy: {e}"

    # LLM
    try:
        if request.app.state.llm.is_available():
            checks["llm"] = "healthy"
        else:
            checks["llm"] = "unhealthy: no providers available"
    except Exception as e:
        checks["llm"] = f"unhealthy: {e}"

    # Overall status
    all_healthy = all(v == "healthy" for v in checks.values())
    status = "healthy" if all_healthy else "degraded"

    return {
        "status": status,
        "checks": checks,
        "uptime_seconds": (datetime.now(timezone.utc) - request.app.state.started_at).total_seconds(),
    }

Graceful Shutdown

import signal
import asyncio
from contextlib import asynccontextmanager

# Track active connections
active_connections: set = set()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Setup signal handlers for graceful shutdown
    def handle_shutdown(signum, frame):
        logger.info("shutdown_signal_received", signal=signum)
        asyncio.create_task(graceful_shutdown())

    signal.signal(signal.SIGTERM, handle_shutdown)
    signal.signal(signal.SIGINT, handle_shutdown)

    # Startup
    app.state.shutting_down = False
    yield

    # Shutdown
    logger.info("waiting_for_active_connections", count=len(active_connections))
    # Wait for active requests (up to 30 seconds)
    for _ in range(30):
        if not active_connections:
            break
        await asyncio.sleep(1)

    logger.info("shutdown_complete")


async def graceful_shutdown():
    """Initiate graceful shutdown."""
    app.state.shutting_down = True
    logger.info("graceful_shutdown_initiated")


# Middleware to track active connections
@app.middleware("http")
async def track_connections(request: Request, call_next):
    if app.state.shutting_down:
        return JSONResponse(
            status_code=503,
            content={"detail": "Server is shutting down"},
        )

    connection_id = id(request)
    active_connections.add(connection_id)
    try:
        return await call_next(request)
    finally:
        active_connections.discard(connection_id)

Testing with Lifespan

# tests/conftest.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from app.main import app


@pytest.fixture
async def client():
    """Test client with mocked dependencies."""
    # Override app state for testing
    app.state.db_engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        echo=True,
    )
    app.state.redis = FakeRedis()
    app.state.task_queue = FakeTaskQueue()

    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client


@pytest.fixture
async def db_session(client):
    """Get database session for tests."""
    async with AsyncSession(app.state.db_engine) as session:
        yield session

Environment-Specific Lifespan

from app.core.config import settings


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifespan with environment-specific behavior."""

    if settings.environment == "test":
        # Minimal setup for tests
        app.state.db_engine = create_async_engine("sqlite+aiosqlite:///:memory:")
        app.state.redis = FakeRedis()
        yield
        return

    if settings.environment == "development":
        # Development setup
        app.state.db_engine = create_async_engine(
            settings.database_url,
            echo=True,  # SQL logging
        )
        app.state.redis = redis.from_url(settings.redis_url)
        yield
        await app.state.redis.close()
        await app.state.db_engine.dispose()
        return

    # Production setup (full initialization)
    # ... full production initialization code ...
    yield
    # ... full cleanup code ...

Sqlalchemy Async Examples

SQLAlchemy 2.0 Async Examples

Example 1: Complete FastAPI Setup

# app/db/engine.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
    echo=False,
)

async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


# app/api/deps.py
from typing import AsyncGenerator

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


# app/api/routes/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import selectinload

router = APIRouter()

@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(404, "User not found")
    return user

Example 2: Model with Proper Type Hints

from datetime import datetime, timezone
from uuid import UUID, uuid4
from sqlalchemy import String, ForeignKey, DateTime
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID as PG_UUID

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[UUID] = mapped_column(
        PG_UUID(as_uuid=True),
        primary_key=True,
        default=uuid4,
    )
    email: Mapped[str] = mapped_column(
        String(255),
        unique=True,
        index=True,
    )
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
    )

    # Prevent accidental lazy loading
    orders: Mapped[list["Order"]] = relationship(
        back_populates="user",
        lazy="raise",
    )

class Order(Base):
    __tablename__ = "orders"

    id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True)
    user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
    total_cents: Mapped[int]
    status: Mapped[str] = mapped_column(String(20), default="pending")

    user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")
    items: Mapped[list["OrderItem"]] = relationship(lazy="raise")

Example 3: Repository Pattern

from typing import Generic, TypeVar
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T", bound=Base)

class AsyncRepository(Generic[T]):
    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model

    async def get(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)

    async def get_by_ids(self, ids: list[UUID]) -> list[T]:
        if not ids:
            return []
        result = await self.session.execute(
            select(self.model).where(self.model.id.in_(ids))
        )
        return list(result.scalars().all())

    async def list(
        self,
        *,
        offset: int = 0,
        limit: int = 100,
    ) -> list[T]:
        result = await self.session.execute(
            select(self.model).offset(offset).limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.flush()
        return instance

    async def update(self, instance: T, **kwargs) -> T:
        for key, value in kwargs.items():
            setattr(instance, key, value)
        await self.session.flush()
        return instance

    async def delete(self, instance: T) -> None:
        await self.session.delete(instance)
        await self.session.flush()


# Usage
class UserRepository(AsyncRepository[User]):
    def __init__(self, session: AsyncSession):
        super().__init__(session, User)

    async def get_by_email(self, email: str) -> User | None:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def get_with_orders(self, user_id: UUID) -> User | None:
        result = await self.session.execute(
            select(User)
            .options(selectinload(User.orders))
            .where(User.id == user_id)
        )
        return result.scalar_one_or_none()

Example 4: Bulk Operations

async def bulk_create_users(
    db: AsyncSession,
    users_data: list[dict],
    chunk_size: int = 1000,
) -> int:
    """Efficiently insert many users in chunks."""
    total = 0

    for i in range(0, len(users_data), chunk_size):
        chunk = users_data[i:i + chunk_size]
        users = [User(**data) for data in chunk]
        db.add_all(users)
        await db.flush()  # Get IDs, manage memory
        total += len(chunk)

    return total


async def bulk_update_status(
    db: AsyncSession,
    order_ids: list[UUID],
    new_status: str,
) -> int:
    """Bulk update using UPDATE statement."""
    from sqlalchemy import update

    result = await db.execute(
        update(Order)
        .where(Order.id.in_(order_ids))
        .values(status=new_status)
    )
    return result.rowcount

Example 5: Transaction Management

from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_funds(
    db: AsyncSession,
    from_account_id: UUID,
    to_account_id: UUID,
    amount: int,
) -> None:
    """Transfer with explicit transaction and row locking."""
    async with db.begin():  # Explicit transaction
        # Lock rows to prevent concurrent modification
        from_account = await db.get(
            Account,
            from_account_id,
            with_for_update=True,
        )
        to_account = await db.get(
            Account,
            to_account_id,
            with_for_update=True,
        )

        if not from_account or not to_account:
            raise ValueError("Account not found")

        if from_account.balance < amount:
            raise ValueError("Insufficient funds")

        from_account.balance -= amount
        to_account.balance += amount

        # Transaction commits on exit, rolls back on exception

Example 6: Complex Queries with Joins

from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload, joinedload

async def get_user_order_summary(
    db: AsyncSession,
    user_id: UUID,
) -> dict:
    """Get user with order statistics."""
    # Get user with eager-loaded orders
    user_result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .where(User.id == user_id)
    )
    user = user_result.scalar_one_or_none()

    if not user:
        return None

    # Get aggregate stats
    stats_result = await db.execute(
        select(
            func.count(Order.id).label("total_orders"),
            func.sum(Order.total_cents).label("total_spent"),
            func.avg(Order.total_cents).label("avg_order"),
        )
        .where(Order.user_id == user_id)
    )
    stats = stats_result.one()

    return {
        "user": user,
        "total_orders": stats.total_orders,
        "total_spent_cents": stats.total_spent or 0,
        "avg_order_cents": float(stats.avg_order or 0),
    }
Edit on GitHub

Last updated on

On this page

Python BackendQuick ReferenceQuick StartAsyncioKey PatternsKey DecisionsFastAPIKey PatternsKey DecisionsSQLAlchemyKey PatternsKey DecisionsPoolingKey PatternsPool Sizing FormulaAnti-Patterns (FORBIDDEN)Related SkillsRules (12)Handle asyncio cancellation correctly for proper TaskGroup and timeout behavior — HIGHCancellation HandlingProper Cancellation PatternAnti-PatternsKey PrinciplesUse semaphores and sync bridges to prevent resource exhaustion and event loop blocking — HIGHStructured Concurrency PatternsSemaphore for Concurrency LimitingSync-to-Async BridgeKey PrinciplesApply TaskGroup for structured concurrency with automatic cancellation on failure — HIGHTaskGroup & Timeout PatternsTaskGroup (Replaces gather)TaskGroup with TimeoutException Group HandlingKey PrinciplesConfigure FastAPI lifespan management to prevent resource leaks during startup and shutdown — HIGHFastAPI Lifespan & HealthLifespan Context ManagerHealth Check EndpointPydantic SettingsException HandlersResponse OptimizationDesign FastAPI dependency injection for testable and maintainable application architecture — HIGHFastAPI Dependency InjectionDatabase Session DependencyService DependenciesCached SettingsAuthentication ChainOrder FastAPI middleware correctly since it affects every request in the application — HIGHFastAPI Middleware PatternsRequest ID MiddlewareTiming MiddlewareStructured Logging MiddlewareCORS ConfigurationMiddleware OrderConfigure database connection pools correctly to prevent connection exhaustion under load — MEDIUMDatabase Connection PoolingSQLAlchemy Async Pool ConfigurationDirect asyncpg PoolPool SizingReuse HTTP sessions with connection pooling to prevent churn and improve throughput — MEDIUMHTTP Connection Poolingaiohttp Session PoolFastAPI Lifespan IntegrationKey PrinciplesMonitor connection pools to prevent silent exhaustion and stale connection errors — MEDIUMPool Monitoring & TuningPool Monitoring with PrometheusConnection Exhaustion DiagnosisStale Connection HandlingAnti-PatternsImplement repository pattern and bulk operations for maintainable SQLAlchemy data access — HIGHRepository & Bulk OperationsGeneric Repository PatternBulk OperationsKey PrinciplesConfigure eager loading for SQLAlchemy relationships to prevent N+1 query performance problems — HIGHRelationships & Eager LoadingEager Loading (Avoid N+1)Concurrent Queries (Session Safety)Key DecisionsConfigure SQLAlchemy sessions and engines correctly to prevent connection leaks and lazy load errors — HIGHSQLAlchemy Sessions & ModelsEngine and Session FactoryFastAPI DependencyModel DefinitionKey PrinciplesReferences (6)Eager LoadingEager Loading Patterns for Async SQLAlchemyThe N+1 Problem in AsyncLoading Strategiesselectinload (Recommended for Collections)joinedload (Best for Single Relations)Nested Eager LoadingConfiguring Models to Prevent Lazy LoadStrategy ComparisonDynamic Loading for Large CollectionsFastapi IntegrationFastAPI + SQLAlchemy 2.0 Async IntegrationComplete SetupDependency InjectionRoute with Database AccessService Layer PatternTransaction ManagementLifespan with DatabaseMiddleware StackFastAPI Middleware StackMiddleware Execution OrderMiddleware Registration OrderCore Middleware ImplementationsRequest ID MiddlewareTiming MiddlewareLogging MiddlewareCORS ConfigurationAdvanced PatternsConditional MiddlewareError Handling MiddlewareRequest Body CachingPerformance ConsiderationsAsync vs Sync MiddlewareMiddleware vs DependenciesTesting MiddlewareRelated FilesPool SizingConnection Pool Sizing GuideDatabase Pool SizingFormulaExamplesPostgreSQL Connection Limitsmax_overflow SettingGuidelinesHTTP Connection Pool Sizingaiohttp TCPConnectorGuidelinesExampleMonitoring Pool HealthKey MetricsAlert ThresholdsSemaphore PatternsSemaphore Patterns for Concurrency LimitingBasic Rate LimitingDatabase Connection LimitingBounded Work QueueCommon PitfallsTaskgroup PatternsTaskGroup PatternsBasic TaskGroup UsageTaskGroup with Partial Failure HandlingTaskGroup with Timeout per TaskTaskGroup vs gather ComparisonWhen to Still Use gatherChecklists (4)Async Implementation ChecklistAsync Implementation ChecklistBefore StartingTaskGroup UsageTimeout HandlingCancellation SafetyConcurrency LimitingSync-to-Async BridgeTestingPerformanceConnection Pool ChecklistConnection Pool ChecklistDatabase Pool ConfigurationConnection HealthHTTP Session ConfigurationLifecycle ManagementMonitoringTestingPerformanceFastapi Production ChecklistFastAPI Production ChecklistApplication SetupLifespan ManagementConfigurationResponse ClassMiddleware StackOrder (add in reverse)Required MiddlewareDependency InjectionDatabase SessionService DependenciesError HandlingException HandlersRFC 9457 Problem DetailsSecurityAuthenticationInput ValidationHeadersPerformanceAsync Best PracticesConnection PoolingCachingObservabilityLoggingMetricsHealth ChecksDocumentationOpenAPIAPI InfoTestingTest ConfigurationTest CoverageDeploymentDockerKubernetesEnvironment VariablesQuick ReferenceSqlalchemy Async ChecklistSQLAlchemy 2.0 Async ChecklistEngine ConfigurationSession FactoryFastAPI IntegrationModel DefinitionEager LoadingBulk OperationsConcurrencyError HandlingTestingPerformanceExamples (4)Asyncio ExamplesAsyncio Advanced ExamplesExample 1: Concurrent API Fetching with TaskGroupExample 2: Rate-Limited Bulk ProcessingExample 3: Graceful Shutdown with CleanupExample 4: Exception Group HandlingExample 5: Async Context Manager with Resource PoolConnection Pooling ExamplesConnection Pooling ExamplesSQLAlchemy Async Pool with FastAPIDirect asyncpg Poolaiohttp Session with Connection PoolPool Health MonitoringConnection Retry with BackoffMulti-Database Pool ManagementFastapi LifespanFastAPI Lifespan ManagementBasic LifespanFull Production LifespanAccessing App State in RoutesHealth Check Using App StateGraceful ShutdownTesting with LifespanEnvironment-Specific LifespanSqlalchemy Async ExamplesSQLAlchemy 2.0 Async ExamplesExample 1: Complete FastAPI SetupExample 2: Model with Proper Type HintsExample 3: Repository PatternExample 4: Bulk OperationsExample 5: Transaction ManagementExample 6: Complex Queries with Joins