Database Patterns
Database design and migration patterns for Alembic migrations, schema design (SQL/NoSQL), and database versioning. Use when creating migrations, designing schemas, normalizing data, managing database versions, or handling schema drift.
Primary Agent: database-engineer
Database Patterns
Comprehensive patterns for database migrations, schema design, and version management. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Alembic Migrations | 3 | CRITICAL | Autogenerate, data migrations, branch management |
| Schema Design | 3 | HIGH | Normalization, indexing strategies, NoSQL patterns |
| Versioning | 3 | HIGH | Changelogs, rollback plans, schema drift detection |
| Zero-Downtime Migration | 2 | CRITICAL | Expand-contract, pgroll, rollback monitoring |
| Database Selection | 1 | HIGH | Choosing the right database, PostgreSQL vs MongoDB, cost analysis |
Total: 12 rules across 5 categories
Quick Start
# Alembic: Auto-generate migration from model changes
# alembic revision --autogenerate -m "add user preferences"
def upgrade() -> None:
op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))
op.execute("UPDATE users SET org_id = 'default-org-uuid' WHERE org_id IS NULL")
def downgrade() -> None:
op.drop_column('users', 'org_id')-- Schema: Normalization to 3NF with proper indexing
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL REFERENCES customers(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);Alembic Migrations
Migration management with Alembic for SQLAlchemy 2.0 async applications.
| Rule | File | Key Pattern |
|---|---|---|
| Autogenerate | rules/alembic-autogenerate.md | Auto-generate from models, async env.py, review workflow |
| Data Migration | rules/alembic-data-migration.md | Batch backfill, two-phase NOT NULL, zero-downtime |
| Branching | rules/alembic-branching.md | Feature branches, merge migrations, conflict resolution |
Schema Design
SQL and NoSQL schema design with normalization, indexing, and constraint patterns.
| Rule | File | Key Pattern |
|---|---|---|
| Normalization | rules/schema-normalization.md | 1NF-3NF, when to denormalize, JSON vs normalized |
| Indexing | rules/schema-indexing.md | B-tree, GIN, HNSW, partial/covering indexes |
| NoSQL Patterns | rules/schema-nosql.md | Embed vs reference, document design, sharding |
Versioning
Database version control and change management across environments.
| Rule | File | Key Pattern |
|---|---|---|
| Changelog | rules/versioning-changelog.md | Schema version table, semantic versioning, audit trails |
| Rollback | rules/versioning-rollback.md | Rollback testing, destructive rollback docs, CI verification |
| Drift Detection | rules/versioning-drift.md | Environment sync, checksum verification, migration locks |
Database Selection
Decision frameworks for choosing the right database. Default: PostgreSQL.
| Rule | File | Key Pattern |
|---|---|---|
| Selection Guide | rules/db-selection.md | PostgreSQL-first, tier-based matrix, anti-patterns |
Key Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Async dialect | postgresql+asyncpg | Native async support for SQLAlchemy 2.0 |
| NOT NULL column | Two-phase: nullable first, then alter | Avoids locking, backward compatible |
| Large table index | CREATE INDEX CONCURRENTLY | Zero-downtime, no table locks |
| Normalization target | 3NF for OLTP | Reduces redundancy while maintaining query performance |
| Primary key strategy | UUID for distributed, INT for single-DB | Context-appropriate key generation |
| Soft deletes | deleted_at timestamp column | Preserves audit trail, enables recovery |
| Migration granularity | One logical change per file | Easier rollback and debugging |
| Production deployment | Generate SQL, review, then apply | Never auto-run in production |
Anti-Patterns (FORBIDDEN)
# NEVER: Add NOT NULL without default or two-phase approach
op.add_column('users', sa.Column('org_id', UUID, nullable=False)) # LOCKS TABLE!
# NEVER: Use blocking index creation on large tables
op.create_index('idx_large', 'big_table', ['col']) # Use CONCURRENTLY
# NEVER: Skip downgrade implementation
def downgrade():
pass # WRONG - implement proper rollback
# NEVER: Modify migration after deployment - create new migration instead
# NEVER: Run migrations automatically in production
# Use: alembic upgrade head --sql > review.sql
# NEVER: Run CONCURRENTLY inside transaction
op.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;") # FAILS
# NEVER: Delete migration history
command.stamp(alembic_config, "head") # Loses history
# NEVER: Skip environments (Always: local -> CI -> staging -> production)Detailed Documentation
| Resource | Description |
|---|---|
| references/ | Advanced patterns: Alembic, normalization, migration, audit, environment, versioning |
| checklists/ | Migration deployment and schema design checklists |
| examples/ | Complete migration examples, schema examples |
| scripts/ | Migration templates, model change detector |
Zero-Downtime Migration
Safe database schema changes without downtime using expand-contract pattern and online schema changes.
| Rule | File | Key Pattern |
|---|---|---|
| Expand-Contract | rules/migration-zero-downtime.md | Expand phase, backfill, contract phase, pgroll automation |
| Rollback & Monitoring | rules/migration-rollback.md | pgroll rollback, lock monitoring, replication lag, backfill progress |
Related Skills
sqlalchemy-2-async- Async SQLAlchemy session patternsork:testing-patterns- Comprehensive testing patterns including migration testingcaching- Cache layer design to complement database performanceork:performance- Performance optimization patterns
Rules (12)
Configure Alembic autogenerate for safe schema migration from model changes — CRITICAL
Alembic Autogenerate Migrations
Async env.py Configuration
# migrations/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# Import your models' Base for autogenerate
from app.models.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata,
literal_binds=True, dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.", poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Generate and Apply
# Generate from model changes
alembic revision --autogenerate -m "add user preferences"
# Apply migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1
# Generate SQL for review (production)
alembic upgrade head --sql > migration.sql
# Check current revision
alembic current
# Show migration history
alembic history --verboseMigration Template
"""Add users table.
Revision ID: abc123
Revises: None
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = 'abc123'
down_revision = None
def upgrade() -> None:
op.create_table('users',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('idx_users_email', 'users', ['email'], unique=True)
def downgrade() -> None:
op.drop_index('idx_users_email', table_name='users')
op.drop_table('users')Key Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Always review generated SQL | May miss custom constraints | Autogenerate is a starting point |
| One logical change per file | Easier rollback and debugging | Atomic migration units |
| Always implement downgrade | Ensures reversibility | Required for safe rollback |
Use postgresql+asyncpg | Native async support | SQLAlchemy 2.0 standard |
Incorrect — Missing target_metadata:
# env.py without target_metadata
from alembic import context
target_metadata = None # Autogenerate won't work!
def run_migrations_online():
context.configure(target_metadata=target_metadata)Correct — Import all models via Base:
# env.py with proper metadata import
from app.models.base import Base # Imports all SQLAlchemy models
from alembic import context
target_metadata = Base.metadata # Autogenerate detects changes
def run_migrations_online():
context.configure(target_metadata=target_metadata)Common Mistakes
- Skipping review of autogenerated migrations
- Running autogenerate without importing all models
- Using
asyncio.run()when event loop already exists (FastAPI lifespan) - Missing
target_metadataconfiguration
Handle Alembic migration branch conflicts with proper merge strategies — CRITICAL
Alembic Branching & Merge Patterns
Creating Feature Branches
# Create a feature branch
alembic revision --branch-label=feature_payments -m "start payments feature"
# Create revision on branch
alembic revision --head=feature_payments@head -m "add payment_methods table"
# View branch structure
alembic branches
# Merge branches before deployment
alembic merge feature_payments@head main@head -m "merge payments feature"Merge Migration
"""Merge feature_payments and main branches.
Revision ID: merge_abc
Revises: ('abc123', 'def456')
"""
revision = 'merge_abc'
down_revision = ('abc123', 'def456') # Tuple for merge
def upgrade() -> None:
pass # No operations - marks merge point
def downgrade() -> None:
raise Exception("Cannot downgrade past merge point")Multi-Database Migrations
# alembic/env.py - Multi-database support
DATABASES = {
'default': 'postgresql://user:pass@localhost/main',
'analytics': 'postgresql://user:pass@localhost/analytics',
}
def run_migrations_online():
for db_name, url in DATABASES.items():
config = context.config
config.set_main_option('sqlalchemy.url', url)
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.', poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(db_name),
version_table=f'alembic_version_{db_name}',
)
with context.begin_transaction():
context.run_migrations()Column Rename (Expand-Contract)
"""Phase 1: Add new column alongside old."""
def upgrade() -> None:
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# Create trigger to sync during transition
op.execute("""
CREATE OR REPLACE FUNCTION sync_user_name()
RETURNS TRIGGER AS $$
BEGIN
NEW.full_name = COALESCE(NEW.full_name, NEW.name);
NEW.name = COALESCE(NEW.name, NEW.full_name);
RETURN NEW;
END; $$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_user_name
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_name();
""")
def downgrade() -> None:
op.execute("DROP TRIGGER IF EXISTS trg_sync_user_name ON users")
op.execute("DROP FUNCTION IF EXISTS sync_user_name()")
op.drop_column('users', 'full_name')Key Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Column rename | 4-phase expand/contract | Safe migration without downtime |
| Branch merge | Merge before deployment | Prevents version conflicts |
| Multi-database | Separate version tables | Independent migration tracking |
| Transaction mode | Default on, disable for CONCURRENTLY | CONCURRENTLY requires no transaction |
Incorrect — Immediate column rename:
# Causes downtime - app breaks immediately
def upgrade():
op.alter_column('users', 'name', new_column_name='full_name')Correct — Expand-contract pattern:
# Phase 1: Add new column, sync with trigger
def upgrade():
op.add_column('users', sa.Column('full_name', sa.String(255)))
op.execute("""
CREATE TRIGGER trg_sync_user_name
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_name()
""")
# Phase 2 (separate migration): Drop old column after app updatedCommon Mistakes
- Merging branches without testing both paths first
- Skipping expand-contract for column renames (causes downtime)
- Using shared version table for multiple databases
- Not using
--branch-labelfor feature isolation
Run Alembic data migrations safely with batch processing and two-phase approaches — CRITICAL
Alembic Data Migration Patterns
Two-Phase NOT NULL Migration
"""Add org_id column (phase 1 - nullable).
Phase 1: Add nullable column
Phase 2: Backfill data
Phase 3: Add NOT NULL (separate migration after verification)
"""
def upgrade() -> None:
# Phase 1: Add as nullable first
op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))
# Phase 2: Backfill with default org
op.execute("""
UPDATE users SET org_id = 'default-org-uuid' WHERE org_id IS NULL
""")
# Phase 3 in SEPARATE migration after app updated:
# op.alter_column('users', 'org_id', nullable=False)
def downgrade() -> None:
op.drop_column('users', 'org_id')Batch Processing Pattern
BATCH_SIZE = 1000
def upgrade() -> None:
op.add_column('users', sa.Column('status', sa.String(20), nullable=True))
conn = op.get_bind()
total_updated = 0
while True:
result = conn.execute(sa.text("""
SELECT id FROM users
WHERE status IS NULL
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
"""), {'batch_size': BATCH_SIZE})
ids = [row[0] for row in result]
if not ids:
break
conn.execute(sa.text("""
UPDATE users
SET status = CASE WHEN is_active THEN 'active' ELSE 'inactive' END
WHERE id = ANY(:ids)
"""), {'ids': ids})
total_updated += len(ids)
conn.commit() # Commit per batch to release locks
def downgrade() -> None:
op.drop_column('users', 'status')Concurrent Index (Zero-Downtime)
def upgrade() -> None:
# CONCURRENTLY avoids table locks on large tables
# IMPORTANT: Cannot run inside transaction block
op.execute("COMMIT")
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
ON users (organization_id, created_at DESC)
""")
def downgrade() -> None:
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")Running Async Code in Migrations
from sqlalchemy.util import await_only
def upgrade() -> None:
connection = op.get_bind()
# Alembic runs in greenlet context, so await_only works
result = await_only(
connection.execute(text("SELECT count(*) FROM users"))
)Backfill Size Guide
| Table Size | Strategy | Notes |
|---|---|---|
| < 10K rows | Single UPDATE | Fast enough |
| 10K-1M rows | Batched UPDATE in migration | LIMIT + commit per batch |
| > 1M rows | Background script + trigger | Trigger for new rows, script for old |
Incorrect — Single-phase NOT NULL:
# Locks table for entire backfill duration
def upgrade():
op.add_column('users', sa.Column('org_id', UUID, nullable=False))
op.execute("UPDATE users SET org_id = 'default-uuid'")Correct — Two-phase NOT NULL:
# Phase 1: Add as nullable, backfill
def upgrade():
op.add_column('users', sa.Column('org_id', UUID, nullable=True))
op.execute("UPDATE users SET org_id = 'default-uuid' WHERE org_id IS NULL")
# Phase 2 (separate migration): Add NOT NULL after verificationCommon Mistakes
- Adding NOT NULL without two-phase approach (locks entire table)
- Using blocking index creation on large tables (use CONCURRENTLY)
- Running CONCURRENTLY inside a transaction block (fails)
- Not committing between batches (holds locks too long)
- Skipping
FOR UPDATE SKIP LOCKEDin batch queries (deadlocks)
Select the right database engine based on workload requirements and trade-offs — HIGH
Database Selection Guide
The Default: PostgreSQL
PostgreSQL is the #1 most-loved database for good reason. Start with PostgreSQL unless you have a specific, validated reason not to.
-- PostgreSQL handles "document store" workloads with JSONB
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_products_metadata ON products USING GIN (metadata);
SELECT * FROM products WHERE metadata @> '{"category": "electronics"}';Key strengths: JSONB (90% of MongoDB use cases), full-text search (tsvector), extensions (PostGIS, pgvector, TimescaleDB), ACID transactions, universal ecosystem support.
Decision Matrix by Project Tier
| Tier | Recommendation | Rationale |
|---|---|---|
| Interview / Take-home | SQLite or PostgreSQL | Zero config or standard choice |
| Hackathon / Prototype | PostgreSQL | Don't waste time on exotic choices |
| MVP (< 6 months) | PostgreSQL | One database, learn it well |
| Growth (1-5 engineers) | PostgreSQL + Redis (cache) | Add Redis only when measured |
| Enterprise (5+) | PostgreSQL primary + purpose-built secondaries | Specialized stores for validated bottlenecks |
Decision Matrix by Data Model
| Data Shape | Best Fit | Why |
|---|---|---|
| Relational with joins | PostgreSQL | Built for this |
| JSON documents with queries | PostgreSQL (JSONB) | Indexed JSON with SQL power |
| Truly schema-less, evolving weekly | MongoDB | Only if you never join |
| Key-value lookups | Redis | Sub-ms reads, ephemeral data |
| Time-series metrics | PostgreSQL + TimescaleDB | Or InfluxDB for extreme scale |
| Vector embeddings | PostgreSQL + pgvector | Or Pinecone for 100M+ vectors |
When to Use Each
- PostgreSQL: Everything, unless proven otherwise. Web apps, APIs, SaaS, complex queries, JSON, full-text search, geo, vectors.
- MongoDB: Truly document-shaped data with no relational needs. Content where schema changes weekly AND you never join.
- Redis: Caching, sessions, ephemeral data. Never as primary datastore.
- SQLite: Embedded, single-user, dev/testing, edge computing. Never for concurrent multi-user writes.
Anti-Patterns
Incorrect:
- Choosing MongoDB because "it's trendy" or "JSON is easier" without evaluating PostgreSQL JSONB
- Premature sharding before exhausting indexing, query optimization, read replicas, connection pooling
- SQLite in production with concurrent writes (causes
SQLITE_BUSY) - Redis as primary datastore (data persistence is best-effort)
- Running PostgreSQL + MongoDB + Redis + Elasticsearch when PostgreSQL alone covers it
Correct:
- Default to PostgreSQL, add specialized stores only for validated bottlenecks
- Before choosing non-PostgreSQL: benchmark (not assume), verify data model incompatibility, confirm team expertise
- Use Redis only as cache layer in front of PostgreSQL
- Use SQLite only for embedded/single-user/dev
References
references/postgres-vs-mongodb.md— Head-to-head comparison, JSONB examplesreferences/cost-comparison.md— Hosting costs, license, operational complexityreferences/db-migration-paths.md— MongoDB→PostgreSQL, SQLite→PostgreSQL strategiesreferences/storage-and-cms.md— CMS backends, file/blob storage decisions
Plan migration rollbacks and monitor execution to prevent extended outages — HIGH
Migration Rollback and Monitoring
Incorrect — no rollback plan or monitoring:
-- FORBIDDEN: Constraint validation in same transaction as creation
ALTER TABLE orders ADD CONSTRAINT fk_org
FOREIGN KEY (org_id) REFERENCES orgs(id);
-- Impact: Full table scan with exclusive lock
-- FORBIDDEN: Backfill without batching
UPDATE users SET new_col = old_col;
-- Impact: Locks entire table, fills transaction log
-- FORBIDDEN: Skip environments
-- Always: local -> CI -> staging -> productionCorrect — pgroll automated rollback:
# Install pgroll
brew install xataio/pgroll/pgroll
# Initialize pgroll in your database
pgroll init --postgres-url "postgres://user:pass@localhost/db"{
"name": "001_add_email_verified",
"operations": [
{
"add_column": {
"table": "users",
"column": {
"name": "email_verified",
"type": "boolean",
"default": "false",
"nullable": false
},
"up": "false"
}
}
]
}# Start migration (creates versioned schema)
pgroll start migrations/001_add_email_verified.json
# After verification, complete migration
pgroll complete
# Rollback if issues
pgroll rollbackMonitoring During Migration
-- Check for locks during migration
SELECT pid, now() - pg_stat_activity.query_start AS duration, query, state
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
AND state != 'idle';
-- Check replication lag (if using replicas)
SELECT client_addr, state, (sent_lsn - replay_lsn) AS replication_lag
FROM pg_stat_replication;
-- Monitor backfill progress
SELECT
COUNT(*) FILTER (WHERE display_name IS NOT NULL) as migrated,
COUNT(*) FILTER (WHERE display_name IS NULL) as remaining,
ROUND(100.0 * COUNT(*) FILTER (WHERE display_name IS NOT NULL) / COUNT(*), 2) as pct_complete
FROM users;Key Decisions
| Decision | Recommendation |
|---|---|
| Automated rollback | Use pgroll for dual-schema versioning |
| Verification | Check pg_stat_statements before contract phase |
| Lock monitoring | Query pg_stat_activity during migration |
| Replication | Monitor lag before completing migration |
| Environment order | local -> CI -> staging -> production (never skip) |
Apply zero-downtime migration patterns to avoid table locks and production outages — CRITICAL
Zero-Downtime Migration Patterns
Incorrect — blocking schema changes:
-- FORBIDDEN: Single-step ALTER that locks table
ALTER TABLE users RENAME COLUMN name TO full_name;
-- Impact: Blocks ALL queries during metadata lock
-- FORBIDDEN: Add NOT NULL to existing column directly
ALTER TABLE orders ADD COLUMN org_id UUID NOT NULL;
-- Impact: Fails immediately if table has data
-- FORBIDDEN: Regular CREATE INDEX on large table
CREATE INDEX idx_big_table_col ON big_table(col);
-- Impact: Locks table for minutes/hours
-- FORBIDDEN: Drop column without verification period
ALTER TABLE users DROP COLUMN legacy_field;
-- Impact: No rollback if application still references itCorrect — expand-contract pattern:
Phase 1: EXPAND Phase 2: MIGRATE Phase 3: CONTRACT
Add new column Backfill data Remove old column
(nullable) Update app to use new (after app migrated)
Both versions workManual Expand Phase
-- Step 1: Add new column (nullable, no default constraint yet)
ALTER TABLE users ADD COLUMN display_name VARCHAR(200);
-- Step 2: Create trigger for dual-write (if app can't dual-write)
CREATE OR REPLACE FUNCTION sync_display_name() RETURNS TRIGGER AS $$
BEGIN
NEW.display_name := CONCAT(NEW.first_name, ' ', NEW.last_name);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_display_name
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_display_name();
-- Step 3: Backfill existing data (in batches)
UPDATE users SET display_name = CONCAT(first_name, ' ', last_name)
WHERE display_name IS NULL
AND id IN (SELECT id FROM users WHERE display_name IS NULL LIMIT 1000);Manual Contract Phase
-- Step 1: Verify no readers of old column
SELECT * FROM pg_stat_statements
WHERE query LIKE '%first_name%' OR query LIKE '%last_name%';
-- Step 2: Drop trigger, then old columns ONLY after app fully migrated
DROP TRIGGER IF EXISTS trg_sync_display_name ON users;
ALTER TABLE users DROP COLUMN first_name;
ALTER TABLE users DROP COLUMN last_name;
ALTER TABLE users ALTER COLUMN display_name SET NOT NULL;NOT VALID Constraint Pattern
-- Step 1: Add constraint without validating existing rows (instant)
ALTER TABLE orders ADD CONSTRAINT chk_amount_positive
CHECK (amount > 0) NOT VALID;
-- Step 2: Validate constraint (scans table but allows writes)
ALTER TABLE orders VALIDATE CONSTRAINT chk_amount_positive;Key Decisions
| Decision | Recommendation |
|---|---|
| Tool choice | pgroll for automation, manual for simple cases |
| Column rename | Add new + copy + drop old (never RENAME) |
| Constraint timing | Add NOT VALID first, VALIDATE separately |
| Rollback window | Keep old schema 24-72 hours |
| Backfill batch size | 1000-10000 rows per batch |
| Index strategy | CONCURRENTLY always |
Design database indexes to optimize query performance without slowing writes — HIGH
Schema Indexing Strategies
When to Create Indexes
-- Index foreign keys (required for join/cascade performance)
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
-- Index columns in WHERE clauses
CREATE INDEX idx_users_email ON users(email);
-- Index ORDER BY / GROUP BY columns
CREATE INDEX idx_orders_created_at ON orders(created_at);
-- Composite index for multi-column queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);Composite Index Column Order
-- Good: Supports both queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);
-- Uses index: WHERE customer_id = 123 AND status = 'pending'
-- Uses index: WHERE customer_id = 123 (leftmost prefix)
-- Does NOT use index: WHERE status = 'pending' (not leftmost)Rule: Put most selective column first, or most frequently queried alone.
Index Types
B-Tree (Default)
Equality, range queries, sorting.
CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);GIN (Inverted Index)
Full-text search (TSVECTOR), JSONB, arrays.
CREATE INDEX idx_analyses_search_vector ON analyses USING GIN(search_vector);
CREATE INDEX idx_artifact_metadata_gin ON artifacts USING GIN(artifact_metadata);HNSW (Vector Similarity)
Approximate nearest neighbor search (embeddings).
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- m=16: connections per layer | ef_construction=64: build qualityPartial Indexes
Filter frequently queried subsets.
-- Only index completed analyses (common query)
CREATE INDEX idx_analyses_completed ON analyses(created_at DESC) WHERE status = 'complete';Covering Indexes (Index-Only Scans)
Include all queried columns.
CREATE INDEX idx_analyses_status_covering
ON analyses(status, created_at DESC) INCLUDE (id, title);
-- PostgreSQL can satisfy query entirely from indexIndex Maintenance
-- Find unused indexes (candidates for removal)
SELECT indexname, idx_scan FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND indexname NOT LIKE '%_pkey';
-- Rebuild bloated indexes
REINDEX INDEX CONCURRENTLY idx_chunks_vector_hnsw;Constraints as Validation
CREATE TABLE products (
id INT PRIMARY KEY,
price DECIMAL(10, 2) CHECK (price >= 0),
stock INT CHECK (stock >= 0),
discount_percent INT CHECK (discount_percent BETWEEN 0 AND 100)
);Incorrect — Missing foreign key index:
-- FK without index causes slow joins
CREATE TABLE orders (
id UUID PRIMARY KEY,
customer_id UUID REFERENCES customers(id)
-- Missing: CREATE INDEX idx_orders_customer ON orders(customer_id)
);Correct — Index all foreign keys:
-- Index enables fast joins and cascade deletes
CREATE TABLE orders (
id UUID PRIMARY KEY,
customer_id UUID REFERENCES customers(id)
);
CREATE INDEX idx_orders_customer ON orders(customer_id);Anti-Patterns
- Over-indexing: Every index slows writes. Only index actual query patterns.
- Missing FK indexes: Causes slow joins and cascading deletes.
- Wrong column order: Composite indexes only use leftmost prefix.
- FLOAT for money: Use DECIMAL(10, 2) for financial values.
Apply normalization rules to eliminate data redundancy and update anomalies — HIGH
Schema Normalization Patterns
Normal Forms
1st Normal Form (1NF)
Each column contains atomic values, no repeating groups.
-- WRONG: Multiple values in one column
CREATE TABLE orders (
id INT PRIMARY KEY,
product_ids VARCHAR(255) -- '101,102,103' (bad!)
);
-- CORRECT: Separate junction table
CREATE TABLE orders (id INT PRIMARY KEY, customer_id INT);
CREATE TABLE order_items (
id INT PRIMARY KEY,
order_id INT REFERENCES orders(id),
product_id INT
);2nd Normal Form (2NF)
All non-key columns depend on the entire primary key.
-- WRONG: order_date depends only on order_id
CREATE TABLE order_items (
order_id UUID, product_id UUID,
order_date TIMESTAMP, -- Partial dependency!
PRIMARY KEY (order_id, product_id)
);
-- CORRECT: Separate tables
CREATE TABLE orders (id UUID PRIMARY KEY, order_date TIMESTAMP NOT NULL);
CREATE TABLE order_items (
id UUID PRIMARY KEY,
order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id UUID NOT NULL, quantity INTEGER NOT NULL CHECK (quantity > 0)
);3rd Normal Form (3NF)
No transitive dependencies (non-key columns depend only on primary key).
-- WRONG: country_name depends on country_code
CREATE TABLE users (id UUID PRIMARY KEY, country_code TEXT, country_name TEXT);
-- CORRECT: Extract to separate table
CREATE TABLE countries (code TEXT PRIMARY KEY, name TEXT NOT NULL UNIQUE);
CREATE TABLE users (id UUID PRIMARY KEY, country_code TEXT REFERENCES countries(code));When to Denormalize
Denormalize only after profiling shows bottlenecks.
-- Denormalized counter (faster reads)
CREATE TABLE analyses (
id UUID PRIMARY KEY,
artifact_count INTEGER DEFAULT 0 -- Maintained by trigger
);
CREATE FUNCTION update_artifact_count() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
UPDATE analyses SET artifact_count = artifact_count + 1 WHERE id = NEW.analysis_id;
ELSIF TG_OP = 'DELETE' THEN
UPDATE analyses SET artifact_count = artifact_count - 1 WHERE id = OLD.analysis_id;
END IF;
RETURN NULL;
END; $$ LANGUAGE plpgsql;JSON vs Normalized Tables
| Use JSON When | Use Normalized When |
|---|---|
| Schema is flexible/evolving | Need foreign key constraints |
| Data rarely queried individually | Frequent filtering/sorting |
| Structure varies per row | Complex queries (joins, aggregations) |
-- JSON: Flexible metadata
extraction_metadata JSONB -- {"fetch_time_ms": 1234, "charset": "utf-8"}
-- Normalized: Structured queryable data
CREATE TABLE agent_findings (
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
agent_type TEXT NOT NULL,
findings JSONB NOT NULL -- Hybrid: FK + JSONB
);Incorrect — Violating 1NF (repeating groups):
-- Multiple values in one column
CREATE TABLE orders (
id UUID PRIMARY KEY,
product_ids TEXT -- '101,102,103' stored as CSV
);Correct — Junction table (1NF compliant):
-- Atomic values, no repeating groups
CREATE TABLE orders (id UUID PRIMARY KEY);
CREATE TABLE order_items (
order_id UUID REFERENCES orders(id),
product_id UUID,
PRIMARY KEY (order_id, product_id)
);Design Philosophy
- Model the domain, not the UI - Schema reflects business entities
- Optimize for reads OR writes - OLTP normalized, OLAP denormalized
- Data integrity over performance - Constraints first, optimize later
- Plan for scale from day one - Indexing, partitioning, caching strategy
Design NoSQL schemas with embed vs reference trade-offs for query performance — HIGH
NoSQL Schema Design Patterns
SQL vs NoSQL Decision
| Factor | SQL (PostgreSQL) | NoSQL (MongoDB) |
|---|---|---|
| Data relationships | Complex, many-to-many | Simple, hierarchical |
| Query patterns | Ad-hoc, complex joins | Known access patterns |
| Consistency | Strong (ACID) | Eventual (configurable) |
| Schema | Rigid, enforced | Flexible, evolving |
| Scale | Vertical + read replicas | Horizontal sharding |
Embed vs Reference
Embed When:
- Data is always accessed together
- Relationship is 1:1 or 1:few
- Embedded data rarely changes independently
{
"_id": "user-123",
"name": "Alice",
"address": {
"street": "123 Main St",
"city": "Portland",
"state": "OR"
}
}Reference When:
- Data is accessed independently
- Relationship is 1:many or many:many
- Referenced data changes frequently
// users collection
{ "_id": "user-123", "name": "Alice", "org_id": "org-456" }
// organizations collection
{ "_id": "org-456", "name": "Acme Corp", "plan": "enterprise" }Document Size Limits
| Database | Max Document Size | Recommendation |
|---|---|---|
| MongoDB | 16 MB | Keep under 1 MB for performance |
| DynamoDB | 400 KB | Split large items across multiple records |
| Firestore | 1 MB | Use subcollections for large data |
Sharding Strategy
Choose a shard key that ensures even distribution:
// Good: High cardinality, even distribution
db.orders.createIndex({ customer_id: 1, created_at: 1 });
sh.shardCollection("mydb.orders", { customer_id: "hashed" });
// Bad: Low cardinality (hot shard)
sh.shardCollection("mydb.orders", { status: 1 }); // Only 3-5 values!Schema Validation (MongoDB)
db.createCollection("users", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["name", "email", "created_at"],
properties: {
name: { bsonType: "string", minLength: 1 },
email: { bsonType: "string", pattern: "^.+@.+\\..+$" },
status: { enum: ["active", "inactive", "suspended"] },
created_at: { bsonType: "date" }
}
}
}
});Key Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Embed vs reference | Based on access patterns | Co-located reads are faster |
| Sharding key | High cardinality, hashed | Even data distribution |
| Consistency level | Start with strong, relax if needed | Data integrity first |
| Schema validation | Always define for core collections | Catches bugs early |
Incorrect — Unbounded embedded array:
{
"_id": "user-123",
"orders": [
{"id": "order-1", "total": 100},
{"id": "order-2", "total": 200}
]
}Correct — Reference pattern for 1:many:
// users collection
{"_id": "user-123", "name": "Alice"}
// orders collection (separate)
{"_id": "order-1", "user_id": "user-123", "total": 100}
{"_id": "order-2", "user_id": "user-123", "total": 200}Common Mistakes
- Embedding unbounded arrays (document grows forever)
- Using sequential shard keys (hot partition)
- Not defining schema validation (schema chaos)
- Treating NoSQL as "no schema" (still needs design)
Track schema version history with changelogs and audit trails for rollback safety — HIGH
Versioning Changelog & Audit Trails
Schema Version Table
CREATE TABLE schema_version (
version_id SERIAL PRIMARY KEY,
version_number VARCHAR(20) NOT NULL,
description TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
applied_by VARCHAR(100),
execution_time_ms INTEGER,
checksum VARCHAR(64),
CONSTRAINT uq_version_number UNIQUE (version_number)
);Semantic Versioning for Databases
MAJOR.MINOR.PATCH
MAJOR: Breaking changes (drop tables, rename columns)
MINOR: Backward-compatible additions (new tables, nullable columns)
PATCH: Bug fixes, index changes, data migrationsRow-Level Versioning
CREATE TABLE products (
id UUID PRIMARY KEY,
name VARCHAR(200) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
valid_from TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
valid_to TIMESTAMP WITH TIME ZONE,
is_current BOOLEAN NOT NULL DEFAULT TRUE,
created_by VARCHAR(100) NOT NULL,
updated_by VARCHAR(100)
);
CREATE INDEX idx_products_current ON products (id) WHERE is_current = TRUE;
CREATE INDEX idx_products_temporal ON products (id, valid_from, valid_to);Change Data Capture (CDC)
CREATE TABLE change_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
record_id UUID NOT NULL,
old_data JSONB,
new_data JSONB,
changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
changed_by VARCHAR(100),
transaction_id BIGINT DEFAULT txid_current()
);
CREATE OR REPLACE FUNCTION log_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO change_log (table_name, operation, record_id, new_data, changed_by)
VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, to_jsonb(NEW), current_user);
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO change_log (table_name, operation, record_id, old_data, new_data, changed_by)
VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, to_jsonb(OLD), to_jsonb(NEW), current_user);
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO change_log (table_name, operation, record_id, old_data, changed_by)
VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, to_jsonb(OLD), current_user);
END IF;
RETURN COALESCE(NEW, OLD);
END; $$ LANGUAGE plpgsql;Audit Pattern Decision Matrix
| Pattern | Use Case | Complexity |
|---|---|---|
| Row-level versioning | Simple history needs | Low |
| Temporal tables | Point-in-time queries | Medium |
| CDC (change_log) | Full audit compliance | High |
Object Versioning
-- Versioned views
CREATE VIEW orders_summary_v1 AS SELECT order_id, customer_id, total FROM orders;
CREATE VIEW orders_summary_v2 AS SELECT order_id, customer_id, total, shipping_cost FROM orders;
CREATE VIEW orders_summary AS SELECT * FROM orders_summary_v2; -- Current aliasIncorrect — Mutable audit log:
-- Allows modification/deletion of history
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
action TEXT,
changed_at TIMESTAMP
);
-- Missing: Triggers, permissions to prevent changesCorrect — Immutable audit trail:
-- Append-only with JSONB for full history
CREATE TABLE change_log (
id BIGSERIAL PRIMARY KEY,
old_data JSONB,
new_data JSONB,
changed_at TIMESTAMP DEFAULT NOW()
);
REVOKE DELETE, UPDATE ON change_log FROM app_user;Best Practices
| Practice | Reason |
|---|---|
| Version everything | Full traceability |
| Immutable history | Audit compliance |
| Checksum verification | Detect unauthorized changes |
| Semantic versioning | Clear impact communication |
Detect schema drift between environments using checksum verification and coordination — HIGH
Versioning Drift Detection
Multi-Environment Migration Flow
Local (dev) -> CI (test) -> Staging (preview) -> Production (live)
alembic alembic alembic alembic
upgrade upgrade upgrade upgrade
head head head headNever skip environments. Always: local -> CI -> staging -> production.
Checksum Verification
def test_migration_checksums(alembic_config):
"""Verify migrations haven't been modified after deployment."""
script = ScriptDirectory.from_config(alembic_config)
for revision in script.walk_revisions():
if revision.revision in DEPLOYED_MIGRATIONS:
current_checksum = calculate_checksum(revision.path)
expected_checksum = DEPLOYED_MIGRATIONS[revision.revision]
assert current_checksum == expected_checksum, \
f"Migration {revision.revision} was modified after deployment!"Environment-Specific Settings
# alembic/env.py
import os
def run_migrations_online():
env = os.getenv("ENVIRONMENT", "development")
if env == "production":
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True,
postgresql_set_session_options={
"statement_timeout": "30s",
"lock_timeout": "10s"
}
)
else:
context.configure(
connection=connection,
target_metadata=target_metadata
)Migration Locks (Prevent Concurrent Migrations)
"""Migration with advisory lock.
Prevents multiple instances from running migrations simultaneously.
"""
def upgrade():
# Acquire advisory lock (blocks until available)
op.execute(text("SELECT pg_advisory_lock(12345)"))
try:
op.create_table('new_table', ...)
finally:
op.execute(text("SELECT pg_advisory_unlock(12345)"))Migration Numbering Schemes
Option 1: Sequential 001_initial.sql, 002_add_users.sql
Option 2: Timestamp 20260115120000_initial.sql
Option 3: Hybrid 2026_01_15_001_initial.sqlRecommendation: Timestamp-based (avoids conflicts in parallel development).
Conditional Migration Logic
"""Add analytics index (production only)."""
def upgrade() -> None:
if os.getenv('ENVIRONMENT') == 'production':
op.execute("COMMIT")
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_timestamp
ON events (timestamp DESC)
""")
else:
op.create_index('idx_events_timestamp', 'events', ['timestamp'])Best Practices
| Practice | Reason |
|---|---|
| Always test migrations locally first | Catch errors early |
| Use transaction per migration | Atomic rollback on failure |
| Set lock timeouts in production | Prevent long-held locks |
| Never skip environments | Ensure consistency |
| Checksum deployed migrations | Detect unauthorized changes |
| Environment parity | Consistent deployments |
Incorrect — Skipping environments:
# Dangerous: Deploy to prod without staging test
alembic upgrade head # On production DB directlyCorrect — Progressive deployment:
# Safe: Test in each environment sequentially
alembic upgrade head # Local
# CI tests pass
alembic upgrade head # Staging
# Smoke tests pass
alembic upgrade head # ProductionAnti-Patterns
- Modifying deployed migrations (create new migration instead)
- Skipping staging and deploying directly to production
- Running concurrent migrations without advisory locks
- Versioning sensitive data in migrations (security risk)
Test rollback paths with verified downgrade procedures and documented data loss risks — HIGH
Versioning Rollback Patterns
Full Cycle Testing
# tests/test_migrations.py
import pytest
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory
@pytest.fixture
def alembic_config():
return Config("alembic.ini")
def test_migrations_upgrade_downgrade(alembic_config, test_db):
"""Test all migrations can be applied and rolled back."""
command.upgrade(alembic_config, "head")
command.downgrade(alembic_config, "base")
assert get_table_count(test_db) == 0Local Rollback Verification
# Apply migration
alembic upgrade head
# Verify schema change
psql -c "\d tablename"
# Rollback migration
alembic downgrade -1
# Verify rollback complete
psql -c "\d tablename"
# Re-apply to confirm idempotency
alembic upgrade headData Integrity Testing
def test_migration_preserves_data(alembic_config, test_db):
"""Verify migration doesn't lose data."""
insert_test_records(test_db, count=100)
original_count = get_record_count(test_db)
command.upgrade(alembic_config, "+1")
new_count = get_record_count(test_db)
assert new_count == original_countRollback Safety Check
def test_rollback_safety(alembic_config, test_db):
"""Verify rollback restores previous state."""
command.upgrade(alembic_config, "head")
pre_rollback_schema = get_schema_snapshot(test_db)
apply_pending_migration(alembic_config)
command.downgrade(alembic_config, "-1")
post_rollback_schema = get_schema_snapshot(test_db)
assert pre_rollback_schema == post_rollback_schemaDocumenting Destructive Rollbacks
"""Split phone_numbers column into user_phones table.
Revision ID: abc123
Revises: xyz456
WARNING: Downgrade will result in data loss. Phone number type
(mobile/home/work) cannot be restored to original format.
"""
def downgrade() -> None:
# DESTRUCTIVE: Cannot restore original format
op.add_column('analyses', sa.Column('phone_numbers', sa.Text, nullable=True))
op.drop_table('user_phones')CI Integration
# .github/workflows/migrations.yml
migration-test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_PASSWORD: test
steps:
- uses: actions/checkout@v4
- name: Test migrations
run: pytest tests/test_migrations.py -vAnti-Patterns
# NEVER: Skip downgrade implementation
def downgrade():
pass # WRONG - implement proper rollback
# NEVER: Modify deployed migrations - create new migration instead
# NEVER: Delete migration history
command.stamp(alembic_config, "head") # Loses historyIncorrect — Empty downgrade:
def upgrade():
op.create_table('users', ...)
def downgrade():
pass # Rollback won't work!Correct — Implement downgrade:
def upgrade():
op.create_table('users', ...)
def downgrade():
op.drop_table('users') # Full rollback supportKey Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| Test rollbacks | Always verify locally + CI | Prevents production surprises |
| Destructive rollbacks | Document in migration docstring | Clear risk communication |
| Production rollback | alembic downgrade -1 then investigate | Fast recovery, fix later |
| Immutable migrations | Never modify after deployment | Audit trail integrity |
References (11)
Alembic Advanced
Alembic Advanced Implementation Guide
Advanced patterns for production database migrations with Alembic.
Multi-Database Migrations
Configuration Setup
# alembic/env.py - Multi-database support
from alembic import context
from sqlalchemy import engine_from_config, pool
# Define multiple databases
DATABASES = {
'default': 'postgresql://user:pass@localhost/main',
'analytics': 'postgresql://user:pass@localhost/analytics',
'audit': 'postgresql://user:pass@localhost/audit',
}
def run_migrations_online():
"""Run migrations for each database."""
for db_name, url in DATABASES.items():
config = context.config
config.set_main_option('sqlalchemy.url', url)
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(db_name),
version_table=f'alembic_version_{db_name}',
)
with context.begin_transaction():
context.run_migrations()Per-Database Migrations
# migrations/versions/abc123_add_analytics_table.py
"""Add analytics events table.
Revision ID: abc123
Database: analytics
"""
from alembic import op
import sqlalchemy as sa
revision = 'abc123'
down_revision = 'xyz789'
branch_labels = ('analytics',) # Database-specific branch
def upgrade() -> None:
op.create_table('events',
sa.Column('id', sa.BigInteger, primary_key=True),
sa.Column('event_type', sa.String(100), nullable=False),
sa.Column('payload', sa.JSON, nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
def downgrade() -> None:
op.drop_table('events')Data Migrations with Batching
Batch Processing Pattern
"""Backfill user_status column with batching.
Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session
BATCH_SIZE = 1000
def upgrade() -> None:
# Phase 1: Add nullable column
op.add_column('users', sa.Column('status', sa.String(20), nullable=True))
# Phase 2: Backfill in batches
conn = op.get_bind()
session = Session(bind=conn)
total_updated = 0
while True:
# Fetch batch of IDs without status
result = conn.execute(sa.text("""
SELECT id FROM users
WHERE status IS NULL
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
"""), {'batch_size': BATCH_SIZE})
ids = [row[0] for row in result]
if not ids:
break
# Update batch
conn.execute(sa.text("""
UPDATE users
SET status = CASE
WHEN is_active THEN 'active'
ELSE 'inactive'
END
WHERE id = ANY(:ids)
"""), {'ids': ids})
total_updated += len(ids)
conn.commit() # Commit per batch to release locks
print(f"Backfilled {total_updated} rows...")
# Phase 3: Add NOT NULL constraint (separate migration recommended)
def downgrade() -> None:
op.drop_column('users', 'status')Branch Management and Merging
Creating Migration Branches
# Create a feature branch
alembic revision --branch-label=feature_payments -m "start payments feature"
# Create revision on branch
alembic revision --head=feature_payments@head -m "add payment_methods table"
# View branch structure
alembic branches
# Merge branches before deployment
alembic merge feature_payments@head main@head -m "merge payments feature"Resolving Branch Conflicts
"""Merge feature_payments and main branches.
Revision ID: merge_abc
Revises: ('abc123', 'def456')
"""
revision = 'merge_abc'
down_revision = ('abc123', 'def456') # Tuple for merge
def upgrade() -> None:
# No operations - just marks merge point
pass
def downgrade() -> None:
# Cannot downgrade past merge - requires specific branch
raise Exception("Cannot downgrade past merge point")Online Schema Changes (CONCURRENTLY)
Index Creation Without Locks
"""Add index concurrently on large table.
Revision ID: idx123
"""
from alembic import op
# CRITICAL: Disable transaction for CONCURRENTLY operations
def upgrade() -> None:
# Exit transaction block
op.execute("COMMIT")
# Create index without locking reads/writes
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_customer_date
ON orders (customer_id, created_at DESC)
WHERE status != 'cancelled'
""")
def downgrade() -> None:
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_orders_customer_date")Column Rename (Expand-Contract Pattern)
"""Phase 1: Add new column alongside old.
Revision ID: rename_phase1
"""
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
# Add new column
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# Create trigger to sync during transition
op.execute("""
CREATE OR REPLACE FUNCTION sync_user_name()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
NEW.full_name = COALESCE(NEW.full_name, NEW.name);
NEW.name = COALESCE(NEW.name, NEW.full_name);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_user_name
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_name();
""")
def downgrade() -> None:
op.execute("DROP TRIGGER IF EXISTS trg_sync_user_name ON users")
op.execute("DROP FUNCTION IF EXISTS sync_user_name()")
op.drop_column('users', 'full_name')Environment-Specific Migrations
Conditional Migration Logic
"""Add analytics index (production only).
Revision ID: prod_idx
"""
from alembic import op
import os
def upgrade() -> None:
# Only create expensive index in production
if os.getenv('ENVIRONMENT') == 'production':
op.execute("COMMIT")
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_timestamp
ON events (timestamp DESC)
""")
else:
# Simpler non-concurrent for dev/test
op.create_index('idx_events_timestamp', 'events', ['timestamp'])
def downgrade() -> None:
if os.getenv('ENVIRONMENT') == 'production':
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_events_timestamp")
else:
op.drop_index('idx_events_timestamp', table_name='events')Migration Hooks
Pre/Post Migration Callbacks
# alembic/env.py
from alembic import context
def before_migration(ctx, revision, heads):
"""Run before each migration."""
print(f"Starting migration: {revision}")
# Notify monitoring, acquire locks, etc.
def after_migration(ctx, revision, heads):
"""Run after each migration."""
print(f"Completed migration: {revision}")
# Clear caches, notify services, etc.
context.configure(
# ... other config ...
on_version_apply=after_migration,
)Audit Trails
Database Audit Trail Patterns
Row-Level Versioning
CREATE TABLE products (
id UUID PRIMARY KEY,
name VARCHAR(200) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
-- Versioning columns
version INTEGER NOT NULL DEFAULT 1,
valid_from TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
valid_to TIMESTAMP WITH TIME ZONE,
is_current BOOLEAN NOT NULL DEFAULT TRUE,
-- Audit columns
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_by VARCHAR(100) NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE,
updated_by VARCHAR(100)
);
-- Index for current records
CREATE INDEX idx_products_current ON products (id) WHERE is_current = TRUE;
-- Index for temporal queries
CREATE INDEX idx_products_temporal ON products (id, valid_from, valid_to);Temporal Tables (PostgreSQL 15+)
-- Enable temporal tables extension
CREATE EXTENSION IF NOT EXISTS temporal_tables;
-- Create temporal table
CREATE TABLE products (
id UUID PRIMARY KEY,
name VARCHAR(200) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
sys_period TSTZRANGE NOT NULL DEFAULT tstzrange(NOW(), NULL)
);
-- History table
CREATE TABLE products_history (LIKE products);
-- Trigger for automatic versioning
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION versioning(
'sys_period', 'products_history', true
);
-- Query at a point in time
SELECT * FROM products
WHERE sys_period @> '2026-01-15 10:00:00+00'::timestamptz;Change Data Capture (CDC)
-- CDC table for tracking all changes
CREATE TABLE change_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
record_id UUID NOT NULL,
old_data JSONB,
new_data JSONB,
changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
changed_by VARCHAR(100),
transaction_id BIGINT DEFAULT txid_current()
);
-- Generic trigger function
CREATE OR REPLACE FUNCTION log_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO change_log (table_name, operation, record_id, new_data, changed_by)
VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO change_log (table_name, operation, record_id, old_data, new_data, changed_by)
VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, to_jsonb(OLD), to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO change_log (table_name, operation, record_id, old_data, changed_by)
VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, to_jsonb(OLD), current_user);
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;
-- Apply to tables
CREATE TRIGGER products_audit
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION log_changes();Decision Matrix
| Pattern | Use Case | Complexity |
|---|---|---|
| Row-level versioning | Simple history needs | Low |
| Temporal tables | Point-in-time queries | Medium |
| CDC | Full audit compliance | High |
Cost Comparison
Database Cost Comparison
Hosting and operational cost analysis to inform database selection decisions.
Managed Hosting Costs (Approximate Monthly)
| Provider | PostgreSQL | MongoDB | Redis |
|---|---|---|---|
| Free tier | Supabase, Neon, Render | Atlas M0 (512 MB) | Upstash (10K cmds/day) |
| Hobby ($5-25) | Supabase Pro ($25), Railway ($5+), Render ($7+) | Atlas M10 ($57+) | Upstash Pay-as-go, Redis Cloud ($5+) |
| Production ($50-200) | Supabase Pro, RDS db.t3.medium ( | Atlas M30 ($230+) | ElastiCache t3.small ( |
| Scale ($200-1000) | RDS db.r6g.large ( | Atlas M50 ($500+) | ElastiCache r6g.large (~$150) |
Key takeaway: MongoDB managed hosting costs 2-3x more than PostgreSQL at equivalent tiers. MongoDB Atlas pricing reflects the vendor lock-in premium.
License Considerations
| Database | License | Impact |
|---|---|---|
| PostgreSQL | PostgreSQL License | Fully permissive. Host anywhere, modify freely. |
| MongoDB | SSPL | Cannot offer MongoDB as a managed service. Limits cloud provider hosting options. |
| Redis | RSALv2 + SSPLv1 (since 2024) | Source-available but not OSS. Alternatives: Valkey (Linux Foundation fork), KeyDB, DragonflyDB. |
| SQLite | Public Domain | Zero restrictions. Embedded in everything. |
Operational Complexity
| Factor | PostgreSQL | MongoDB | Redis | SQLite |
|---|---|---|---|---|
| Backup/restore | pg_dump, pg_basebackup, WAL archiving | mongodump, oplog | RDB snapshots, AOF | File copy |
| Monitoring | pg_stat_statements, pgBadger | Atlas monitoring, mongotop | redis-cli INFO, RedisInsight | N/A |
| Scaling reads | Read replicas (simple) | Replica sets (moderate) | Redis Cluster (moderate) | N/A |
| Scaling writes | Partitioning, Citus (moderate) | Sharding (complex) | Redis Cluster (moderate) | N/A |
| Team expertise needed | Moderate (widely known) | Moderate (less common) | Low (simple API) | Minimal |
| Connection pooling | PgBouncer (essential at scale) | Built-in driver pooling | Built-in | N/A |
Total Cost of Ownership Factors
Beyond hosting, consider:
- Developer time: PostgreSQL has more tutorials, Stack Overflow answers, and ORM support than any alternative
- Hiring: PostgreSQL/SQL skills are universal; MongoDB-specific expertise is niche
- Migration cost: Starting with PostgreSQL avoids expensive future migrations
- Extension ecosystem: PostGIS, pgvector, TimescaleDB, pg_cron are free — equivalent MongoDB features require paid Atlas tiers
- Vendor lock-in: MongoDB Atlas features (Atlas Search, Charts, App Services) don't transfer to self-hosted
Budget Decision Tree
Budget = $0?
YES --> SQLite (embedded) or Supabase/Neon free tier (managed Postgres)
NO -->
Budget < $50/mo?
YES --> Managed PostgreSQL (Supabase, Railway, Render)
NO -->
Budget < $500/mo?
YES --> PostgreSQL (managed) + Redis (Upstash or small ElastiCache)
NO -->
Budget $500+/mo?
YES --> PostgreSQL (RDS/Aurora/Cloud SQL) + Redis + purpose-built stores as neededDb Migration Paths
Database Migration Paths
Common migration scenarios, tools, and risk assessment.
Migration Risk Matrix
| Migration | Difficulty | Risk | Downtime | Typical Duration |
|---|---|---|---|---|
| SQLite to PostgreSQL | Low | Low | Minutes | 1-2 days dev work |
| MongoDB to PostgreSQL | Medium-High | Medium | Hours (with planning) | 1-4 weeks |
| MySQL to PostgreSQL | Medium | Low-Medium | Hours | 1-2 weeks |
| PostgreSQL to PostgreSQL (version upgrade) | Low | Low | Minutes (pg_upgrade) | Hours |
| Single PostgreSQL to read replicas | Low | Low | Near-zero | 1-2 days |
| Redis cache swap (e.g., to Valkey) | Low | Low | Minutes | 1 day |
SQLite to PostgreSQL
The most common migration path for growing projects.
When: App outgrows single-user/embedded model and needs concurrent writes.
Steps:
- Install PostgreSQL and create target database
- Use
pgloader(recommended) for automated schema + data migration - Update connection string and ORM configuration
- Replace SQLite-specific syntax (e.g.,
AUTOINCREMENTtoSERIAL) - Add connection pooling (PgBouncer for production)
- Test concurrent write scenarios
Tools: pgloader (handles schema conversion automatically), sqlite3 .dump + manual SQL cleanup.
Common gotchas:
- SQLite
INTEGER PRIMARY KEYis auto-increment; PostgreSQL needsSERIALorGENERATED ALWAYS AS IDENTITY - SQLite has loose typing; PostgreSQL enforces column types strictly
- Date/time handling differs — SQLite stores as text, PostgreSQL has native types
MongoDB to PostgreSQL
The most impactful migration. Plan carefully.
When: Team realizes relational queries, JOINs, or ACID transactions are needed.
Strategy:
-
Schema mapping: Map each collection to a table. For truly variable documents, use a typed columns + JSONB hybrid:
CREATE TABLE products ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name TEXT NOT NULL, category TEXT NOT NULL, price NUMERIC(10,2) NOT NULL, metadata JSONB -- Variable fields go here ); -
Data extraction:
mongoexport --jsonArrayper collection, transform with scripts, load withCOPY -
Query rewriting: Convert aggregation pipelines to SQL
$matchbecomesWHERE$groupbecomesGROUP BY$lookupbecomesJOIN$unwindbecomesLATERAL JOINorjsonb_array_elements
-
Dual-write period: Write to both databases during transition, read from PostgreSQL, compare results
-
Cutover: Switch reads to PostgreSQL, decommission MongoDB
Tools: mongoexport, custom ETL scripts (Python recommended), pgloader (limited MongoDB support).
Risk mitigations:
- Run dual-write for at least 1 week in production
- Compare query results between both databases automatically
- Keep MongoDB running (read-only) for 30 days post-migration as rollback
MySQL to PostgreSQL
When: Need advanced features (JSONB, CTEs, window functions, extensions) or better standards compliance.
Steps:
- Use
pgloaderfor automated migration (handles most type conversions) - Review and fix:
ENUMtypes,UNSIGNEDintegers,AUTO_INCREMENTtoSERIAL - Replace MySQL-specific functions (
IFNULLtoCOALESCE,LIMIT x,ytoLIMIT y OFFSET x) - Update stored procedures (PL/pgSQL syntax differs from MySQL procedures)
Tools: pgloader (best option), AWS DMS (for RDS-to-RDS), mysqldump + manual conversion.
Adding Read Replicas (PostgreSQL)
When: Read-heavy workload saturates primary, measured (not assumed).
Steps:
- Set up streaming replication (
primary_conninfoinrecovery.conf/postgresql.auto.conf) - Configure application for read/write splitting (write to primary, read from replica)
- Handle replication lag in application logic (eventual consistency for reads)
Tools: Built-in streaming replication, Patroni (HA), PgBouncer (connection routing).
Environment Coordination
Environment Coordination Patterns
Multi-Environment Migration Flow
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Local │───>│ CI │───>│ Staging │───>│ Production │
│ (dev) │ │ (test) │ │ (preview) │ │ (live) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
v v v v
alembic alembic alembic alembic
upgrade upgrade upgrade upgrade
head head head headEnvironment-Specific Settings
# alembic/env.py
import os
def run_migrations_online():
env = os.getenv("ENVIRONMENT", "development")
if env == "production":
# Use statement timeout for safety
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True,
postgresql_set_session_options={
"statement_timeout": "30s",
"lock_timeout": "10s"
}
)
else:
context.configure(
connection=connection,
target_metadata=target_metadata
)Migration Locks (Prevent Concurrent Migrations)
"""Migration with advisory lock.
Prevents multiple instances from running migrations simultaneously.
"""
from alembic import op
from sqlalchemy import text
def upgrade():
# Acquire advisory lock (blocks until available)
op.execute(text("SELECT pg_advisory_lock(12345)"))
try:
op.create_table('new_table', ...)
finally:
# Release lock
op.execute(text("SELECT pg_advisory_unlock(12345)"))Migration Numbering Schemes
Option 1: Sequential
001_initial_schema.sql
002_add_users.sql
003_add_orders.sql
Option 2: Timestamp
20260115120000_initial_schema.sql
20260116143000_add_users.sql
20260117091500_add_orders.sql
Option 3: Hybrid (Date + Sequence)
2026_01_15_001_initial_schema.sql
2026_01_15_002_add_users.sql
2026_01_16_001_add_orders.sqlBest Practices
| Practice | Reason |
|---|---|
| Always test migrations locally first | Catch errors early |
| Use transaction per migration | Atomic rollback on failure |
| Set lock timeouts in production | Prevent long-held locks |
| Never skip environments | Ensure consistency |
Migration Patterns
Database Migration Patterns
Overview
Database migrations evolve schema over time while preserving data and minimizing downtime. This guide covers zero-downtime strategies, backfill patterns, rollback planning, and Alembic best practices.
Zero-Downtime Migrations
The Problem
Traditional migrations lock tables during schema changes, causing downtime for users.
The Solution: Multi-Phase Migrations
Phase 1: Add New Column (Nullable)
-- Migration 1: Add column without constraints
ALTER TABLE analyses
ADD COLUMN content_summary TEXT; -- NULL allowed during transitionWhy nullable? Existing rows don't have values yet. Adding NOT NULL would fail.
Phase 2: Backfill Data
-- Migration 2: Populate new column
UPDATE analyses
SET content_summary = LEFT(raw_content, 500)
WHERE content_summary IS NULL
AND raw_content IS NOT NULL;
-- For large tables, batch updates to avoid long locks
DO $$
DECLARE
batch_size INTEGER := 1000;
updated_rows INTEGER;
BEGIN
LOOP
UPDATE analyses
SET content_summary = LEFT(raw_content, 500)
WHERE id IN (
SELECT id FROM analyses
WHERE content_summary IS NULL AND raw_content IS NOT NULL
LIMIT batch_size
);
GET DIAGNOSTICS updated_rows = ROW_COUNT;
EXIT WHEN updated_rows = 0;
-- Brief pause to allow other transactions
PERFORM pg_sleep(0.1);
END LOOP;
END $$;Phase 3: Add Constraints
-- Migration 3: Add NOT NULL constraint (safe after backfill)
ALTER TABLE analyses
ALTER COLUMN content_summary SET NOT NULL;
-- Add default for new rows
ALTER TABLE analyses
ALTER COLUMN content_summary SET DEFAULT '';Real-World Example: OrchestKit's PII Columns
Migration: 20251210_add_pii_columns.py
def upgrade() -> None:
# Phase 1: Add nullable columns
op.execute(text("""
ALTER TABLE analysis_chunks
ADD COLUMN IF NOT EXISTS pii_flag BOOLEAN DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS pii_types JSONB
"""))
# Phase 2: Backfill (all existing chunks have pii_flag = FALSE)
op.execute(text("""
UPDATE analysis_chunks
SET pii_flag = FALSE
WHERE pii_flag IS NULL
"""))
# Phase 3: Add NOT NULL constraint
op.execute(text("""
ALTER TABLE analysis_chunks
ALTER COLUMN pii_flag SET NOT NULL
"""))Backfill Strategies
Small Tables (< 10,000 rows)
Strategy: Single UPDATE statement.
-- Fast enough for small tables
UPDATE artifacts
SET version = 1
WHERE version IS NULL;Medium Tables (10,000 - 1,000,000 rows)
Strategy: Batched updates with progress tracking.
# Alembic migration
from alembic import op
from sqlalchemy import text
def upgrade() -> None:
# Add column
op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))
# Batched backfill
connection = op.get_bind()
batch_size = 1000
while True:
result = connection.execute(text("""
UPDATE analyses
SET content_summary = LEFT(raw_content, 500)
WHERE id IN (
SELECT id FROM analyses
WHERE content_summary IS NULL AND raw_content IS NOT NULL
LIMIT :batch_size
)
"""), {"batch_size": batch_size})
if result.rowcount == 0:
break
print(f"Backfilled {result.rowcount} rows")Large Tables (> 1,000,000 rows)
Strategy: Background job + application-level backfill.
# Step 1: Add nullable column (migration)
def upgrade() -> None:
op.add_column('analysis_chunks', sa.Column('content_tsvector', TSVECTOR, nullable=True))
# Step 2: Create trigger for new rows (migration)
def upgrade() -> None:
op.execute(text("""
CREATE FUNCTION chunks_tsvector_update() RETURNS TRIGGER AS $$
BEGIN
NEW.content_tsvector :=
setweight(to_tsvector('english', COALESCE(NEW.section_title, '')), 'A') ||
setweight(to_tsvector('english', COALESCE(NEW.snippet, '')), 'B');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER chunks_tsvector_trigger
BEFORE INSERT OR UPDATE OF section_title, snippet ON analysis_chunks
FOR EACH ROW EXECUTE FUNCTION chunks_tsvector_update();
"""))
# Step 3: Backfill old rows (background script, not migration)
# Run: poetry run python scripts/backfill_chunks_tsvector.py
async def backfill_tsvector():
batch_size = 5000
total_backfilled = 0
while True:
async with get_db() as db:
result = await db.execute(text("""
UPDATE analysis_chunks
SET content_tsvector =
setweight(to_tsvector('english', COALESCE(section_title, '')), 'A') ||
setweight(to_tsvector('english', COALESCE(snippet, '')), 'B')
WHERE id IN (
SELECT id FROM analysis_chunks
WHERE content_tsvector IS NULL
LIMIT :batch_size
)
"""), {"batch_size": batch_size})
await db.commit()
if result.rowcount == 0:
break
total_backfilled += result.rowcount
print(f"Backfilled {total_backfilled} total rows")OrchestKit Migration: 20251218_backfill_chunks_tsvector.py (actual example)
Rollback Planning
Always Include downgrade()
Every migration MUST have a rollback path.
def upgrade() -> None:
op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))
def downgrade() -> None:
op.drop_column('analyses', 'content_summary')Destructive Rollbacks (Data Loss)
Warning: Some rollbacks cannot preserve data.
def upgrade() -> None:
# Split 'phone_numbers' TEXT into separate table
op.create_table('user_phones', ...)
# Migrate data from analyses.phone_numbers to user_phones
op.drop_column('analyses', 'phone_numbers')
def downgrade() -> None:
# DESTRUCTIVE: Cannot restore original 'phone_numbers' format
op.add_column('analyses', sa.Column('phone_numbers', sa.Text, nullable=True))
# Drop user_phones table (data loss!)
op.drop_table('user_phones')Solution: Document data loss in migration docstring.
"""Split phone_numbers column into user_phones table.
Revision ID: abc123
Revises: xyz456
Create Date: 2025-12-21
WARNING: Downgrade will result in data loss. Phone number type
(mobile/home/work) cannot be restored to original format.
"""Testing Rollbacks
Always test both upgrade and downgrade paths:
# Apply migration
poetry run alembic upgrade head
# Test rollback
poetry run alembic downgrade -1
# Re-apply
poetry run alembic upgrade headAlembic Best Practices
Migration File Naming
OrchestKit uses timestamp-based naming for clarity:
20251210_harden_embedding_pipeline.py
20251218_backfill_chunks_tsvector.pyFormat: YYYYMMDD_description.py or YYYYMMDDHHmmss_description.py
Use text() for Raw SQL
Required for PostgreSQL-specific features (triggers, constraints, indexes).
from sqlalchemy import text
def upgrade() -> None:
# WRONG: Will fail with syntax errors
op.execute("""
CREATE INDEX idx_vector USING hnsw (vector vector_cosine_ops)
""")
# CORRECT: Use text() wrapper
op.execute(text("""
CREATE INDEX idx_vector
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
"""))Idempotent Migrations
Make migrations safe to re-run (important for development):
def upgrade() -> None:
# Add column only if it doesn't exist
op.execute(text("""
ALTER TABLE analyses
ADD COLUMN IF NOT EXISTS content_summary TEXT
"""))
# Create index only if it doesn't exist
op.execute(text("""
CREATE INDEX IF NOT EXISTS idx_analyses_url
ON analyses(url)
"""))
# Add constraint with existence check
op.execute(text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'chk_granularity'
) THEN
ALTER TABLE analysis_chunks
ADD CONSTRAINT chk_granularity
CHECK (granularity IN ('coarse', 'fine', 'summary'));
END IF;
END $$;
"""))Separate Index Creation
For large tables, create indexes separately (not in transaction):
# Migration creates table without index
poetry run alembic upgrade head
# Manually create index CONCURRENTLY (no locks)
psql -d orchestkit -c "CREATE INDEX CONCURRENTLY idx_chunks_vector_hnsw ON analysis_chunks USING hnsw (vector vector_cosine_ops) WITH (m = 16, ef_construction = 64);"Why? CREATE INDEX CONCURRENTLY cannot run inside transaction blocks (Alembic uses transactions by default).
Foreign Key Constraints with Cascades
Always specify ON DELETE behavior:
def upgrade() -> None:
# WRONG: No cascade (orphaned records)
op.create_foreign_key(
'fk_artifacts_analysis',
'artifacts', 'analyses',
['analysis_id'], ['id']
)
# CORRECT: Cascade deletes
op.execute(text("""
ALTER TABLE artifacts
ADD CONSTRAINT fk_artifacts_analysis
FOREIGN KEY (analysis_id) REFERENCES analyses(id)
ON DELETE CASCADE
"""))OrchestKit Example: 20251210_add_cascade_delete.py
def upgrade() -> None:
# Drop old constraint
op.execute(text("""
ALTER TABLE analysis_chunks
DROP CONSTRAINT IF EXISTS analysis_chunks_analysis_id_fkey
"""))
# Add new constraint with CASCADE
op.execute(text("""
ALTER TABLE analysis_chunks
ADD CONSTRAINT analysis_chunks_analysis_id_fkey
FOREIGN KEY (analysis_id) REFERENCES analyses(id)
ON DELETE CASCADE
"""))Trigger Functions
Pattern: Create reusable functions for common triggers.
def upgrade() -> None:
# Reusable function for updated_at
op.execute(text("""
CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""))
# Apply to multiple tables
for table in ['analyses', 'artifacts', 'analysis_chunks']:
op.execute(text(f"""
CREATE TRIGGER update_{table}_updated_at
BEFORE UPDATE ON {table}
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
"""))Migration Dependencies
Handle branching with merge migrations:
# Two developers create migrations in parallel
# branch-a: revision = "abc123", down_revision = "xyz456"
# branch-b: revision = "def789", down_revision = "xyz456"
# Create merge migration
poetry run alembic merge -m "merge sprint 12 migrations" abc123 def789
# Generates:
# revision = "merge123"
# down_revision = ("abc123", "def789")OrchestKit Example: 500313d1cac9_merge_sprint_12_migrations.py
Common Pitfalls
1. Adding NOT NULL Without Default
# WRONG: Fails if table has existing rows
def upgrade():
op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=False))
# CORRECT: Add with default or nullable first
def upgrade():
op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))
# Backfill data...
op.alter_column('analyses', 'content_summary', nullable=False)2. Renaming Columns Without Downtime
# Multi-phase approach:
# Phase 1: Add new column, populate from old column
op.add_column('analyses', sa.Column('url', sa.Text, nullable=True))
op.execute(text("UPDATE analyses SET url = old_url WHERE url IS NULL"))
# Deploy application code that reads/writes both columns
# Phase 2: Drop old column (after code deployment)
op.drop_column('analyses', 'old_url')3. Changing Column Types
# WRONG: Fails if data is incompatible
op.alter_column('analyses', 'status', type_=sa.Integer)
# CORRECT: Multi-step with validation
op.add_column('analyses', sa.Column('status_new', sa.Integer, nullable=True))
op.execute(text("""
UPDATE analyses SET status_new =
CASE status
WHEN 'pending' THEN 0
WHEN 'complete' THEN 1
WHEN 'failed' THEN 2
END
"""))
op.drop_column('analyses', 'status')
op.alter_column('analyses', 'status_new', new_column_name='status')Summary
| Pattern | Use When | Example |
|---|---|---|
| Zero-Downtime | Production systems | Add nullable → backfill → add constraint |
| Batched Backfill | Medium tables (10K-1M rows) | UPDATE in batches with LIMIT |
| Background Backfill | Large tables (>1M rows) | Trigger for new rows + script for old |
| Idempotent Migrations | Development environments | IF NOT EXISTS checks |
| CASCADE Deletes | Parent-child relationships | ON DELETE CASCADE |
| Triggers | Auto-update computed fields | updated_at, search_vector, tsvector |
| Merge Migrations | Parallel development | Combine divergent branches |
Golden Rules:
- Always test
upgrade()anddowngrade()locally - Never add NOT NULL without backfilling first
- Use
text()for PostgreSQL-specific SQL - Document destructive rollbacks
- Create indexes CONCURRENTLY for large tables
Migration Testing
Migration Testing Patterns
Full Cycle Testing
# tests/test_migrations.py
import pytest
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory
@pytest.fixture
def alembic_config():
return Config("alembic.ini")
def test_migrations_upgrade_downgrade(alembic_config, test_db):
"""Test all migrations can be applied and rolled back."""
# Get all revisions
script = ScriptDirectory.from_config(alembic_config)
revisions = list(script.walk_revisions())
# Apply all migrations
command.upgrade(alembic_config, "head")
# Downgrade all migrations
command.downgrade(alembic_config, "base")
# Verify clean state
assert get_table_count(test_db) == 0Checksum Verification
def test_migration_checksums(alembic_config):
"""Verify migrations haven't been modified after deployment."""
script = ScriptDirectory.from_config(alembic_config)
for revision in script.walk_revisions():
if revision.revision in DEPLOYED_MIGRATIONS:
current_checksum = calculate_checksum(revision.path)
expected_checksum = DEPLOYED_MIGRATIONS[revision.revision]
assert current_checksum == expected_checksum, \
f"Migration {revision.revision} was modified after deployment!"Data Integrity Testing
def test_migration_preserves_data(alembic_config, test_db):
"""Verify migration doesn't lose data."""
# Insert test data
insert_test_records(test_db, count=100)
original_count = get_record_count(test_db)
# Run migration
command.upgrade(alembic_config, "+1")
# Verify data preserved
new_count = get_record_count(test_db)
assert new_count == original_countRollback Testing
def test_rollback_safety(alembic_config, test_db):
"""Verify rollback restores previous state."""
# Get initial state
command.upgrade(alembic_config, "head")
pre_rollback_schema = get_schema_snapshot(test_db)
# Apply new migration
apply_pending_migration(alembic_config)
# Rollback
command.downgrade(alembic_config, "-1")
# Verify schema restored
post_rollback_schema = get_schema_snapshot(test_db)
assert pre_rollback_schema == post_rollback_schemaCI Integration
# .github/workflows/migrations.yml
migration-test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_PASSWORD: test
steps:
- uses: actions/checkout@v4
- name: Test migrations
run: |
pytest tests/test_migrations.py -vNormalization Patterns
Database Normalization Patterns
Overview
Normalization is the process of organizing data to reduce redundancy and improve data integrity. This guide covers normal forms, denormalization strategies, and modern patterns like JSON columns.
Normal Forms (1NF through BCNF)
First Normal Form (1NF)
Rule: Each column contains atomic (indivisible) values, and each row is unique.
Anti-pattern:
-- WRONG: Multiple values in one column
CREATE TABLE users (
id UUID PRIMARY KEY,
phone_numbers TEXT -- "555-1234, 555-5678, 555-9012"
);Correct:
-- RIGHT: Atomic values only
CREATE TABLE users (
id UUID PRIMARY KEY,
name TEXT NOT NULL
);
CREATE TABLE user_phones (
id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
phone_number TEXT NOT NULL,
phone_type TEXT CHECK (phone_type IN ('mobile', 'home', 'work'))
);Second Normal Form (2NF)
Rule: Must be in 1NF + no partial dependencies (all non-key columns depend on the entire primary key).
Anti-pattern:
-- WRONG: order_date depends only on order_id, not on (order_id, product_id)
CREATE TABLE order_items (
order_id UUID,
product_id UUID,
order_date TIMESTAMP, -- Partial dependency!
quantity INTEGER,
PRIMARY KEY (order_id, product_id)
);Correct:
-- RIGHT: Separate tables for independent entities
CREATE TABLE orders (
id UUID PRIMARY KEY,
order_date TIMESTAMP NOT NULL,
customer_id UUID NOT NULL
);
CREATE TABLE order_items (
id UUID PRIMARY KEY,
order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
product_id UUID NOT NULL REFERENCES products(id),
quantity INTEGER NOT NULL CHECK (quantity > 0)
);Third Normal Form (3NF)
Rule: Must be in 2NF + no transitive dependencies (non-key columns depend only on the primary key).
Anti-pattern:
-- WRONG: country_name depends on country_code, not on user_id
CREATE TABLE users (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
country_code TEXT,
country_name TEXT -- Transitive dependency!
);Correct:
-- RIGHT: Extract country data to separate table
CREATE TABLE countries (
code TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE users (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
country_code TEXT REFERENCES countries(code)
);Boyce-Codd Normal Form (BCNF)
Rule: Must be in 3NF + every determinant is a candidate key.
When to use: Rare edge cases with overlapping candidate keys. Most applications stop at 3NF.
When to Denormalize for Performance
Read-Heavy Workloads
Pattern: Denormalize frequently joined data to reduce query complexity.
-- Normalized (slower reads, cleaner updates)
CREATE TABLE analyses (
id UUID PRIMARY KEY,
url TEXT NOT NULL,
created_at TIMESTAMP NOT NULL
);
CREATE TABLE artifacts (
id UUID PRIMARY KEY,
analysis_id UUID REFERENCES analyses(id) ON DELETE CASCADE,
markdown_content TEXT NOT NULL
);
-- Denormalized (faster reads, requires sync logic)
CREATE TABLE analyses (
id UUID PRIMARY KEY,
url TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
artifact_count INTEGER DEFAULT 0 -- Denormalized counter
);
-- Update trigger to maintain artifact_count
CREATE FUNCTION update_artifact_count() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
UPDATE analyses SET artifact_count = artifact_count + 1
WHERE id = NEW.analysis_id;
ELSIF TG_OP = 'DELETE' THEN
UPDATE analyses SET artifact_count = artifact_count - 1
WHERE id = OLD.analysis_id;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER artifact_count_trigger
AFTER INSERT OR DELETE ON artifacts
FOR EACH ROW EXECUTE FUNCTION update_artifact_count();Computed Aggregates
OrchestKit Example: Download counts on artifacts table.
-- Denormalized counter (fast reads, eventual consistency)
CREATE TABLE artifacts (
id UUID PRIMARY KEY,
markdown_content TEXT NOT NULL,
download_count INTEGER DEFAULT 0 NOT NULL
);
-- Increment without locks (eventual consistency)
UPDATE artifacts SET download_count = download_count + 1 WHERE id = :artifact_id;When NOT to denormalize:
- Frequent writes to denormalized fields (update overhead)
- Complex sync logic prone to bugs
- Data integrity is critical (financial, medical)
JSON Columns vs Normalized Tables
Use JSON Columns When:
- Schema is flexible/evolving (e.g., extraction metadata, API responses)
- Data is rarely queried individually (stored as opaque blob)
- Structure varies per row (e.g., different content types)
OrchestKit Examples:
-- extraction_metadata: Flexible schema, rarely queried
CREATE TABLE analyses (
id UUID PRIMARY KEY,
url TEXT NOT NULL,
extraction_metadata JSONB -- {"fetch_time_ms": 1234, "charset": "utf-8"}
);
-- artifact_metadata: Tags, topics, complexity (flexible)
CREATE TABLE artifacts (
id UUID PRIMARY KEY,
artifact_metadata JSONB -- {"topics": ["RAG", "LangGraph"], "complexity": "intermediate"}
);
-- Query JSONB with operators
SELECT * FROM artifacts
WHERE artifact_metadata @> '{"topics": ["RAG"]}'::jsonb;
-- Index JSONB for performance
CREATE INDEX idx_artifact_metadata_gin ON artifacts USING GIN (artifact_metadata);Use Normalized Tables When:
- Need foreign key constraints (referential integrity)
- Frequent filtering/sorting on individual fields
- Complex queries (joins, aggregations)
OrchestKit Example:
-- agent_findings: Structured data with foreign key
CREATE TABLE agent_findings (
id UUID PRIMARY KEY,
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
agent_type TEXT NOT NULL,
findings JSONB NOT NULL, -- Hybrid: FK + JSONB
confidence_score FLOAT
);
-- analysis_chunks: Fully normalized for vector search
CREATE TABLE analysis_chunks (
id UUID PRIMARY KEY,
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
section_title TEXT,
snippet TEXT,
vector VECTOR(1536), -- Cannot use JSONB for vector ops
content_tsvector TSVECTOR -- Cannot use JSONB for full-text search
);Indexing Strategies
B-Tree Indexes (Default)
Use for: Equality, range queries, sorting.
-- Single-column indexes
CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);
-- Composite indexes (order matters!)
CREATE INDEX idx_chunks_analysis_granularity
ON analysis_chunks(analysis_id, granularity);
-- Query uses index: WHERE analysis_id = X AND granularity = Y
-- Query uses index (partial): WHERE analysis_id = X
-- Query does NOT use index: WHERE granularity = Y (wrong column order)Rule of Thumb: Put high-selectivity columns first (columns that filter out the most rows).
GIN Indexes (Inverted Indexes)
Use for: Full-text search (TSVECTOR), JSONB, arrays.
-- Full-text search
CREATE INDEX idx_analyses_search_vector
ON analyses USING GIN(search_vector);
-- JSONB containment queries
CREATE INDEX idx_artifact_metadata_gin
ON artifacts USING GIN(artifact_metadata);
-- Array containment
CREATE INDEX idx_tags_gin
ON articles USING GIN(tags);HNSW Indexes (Vector Similarity)
Use for: Approximate nearest neighbor search (embeddings).
-- OrchestKit: Semantic search on chunks
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Parameters:
-- m = 16: Connections per layer (higher = more accurate, slower)
-- ef_construction = 64: Build quality (higher = better recall, slower indexing)Partial Indexes
Use for: Filter frequently queried subsets.
-- Only index completed analyses (common query pattern)
CREATE INDEX idx_analyses_completed
ON analyses(created_at)
WHERE status = 'complete';
-- Only index chunks with content_type (90% have it)
CREATE INDEX idx_chunks_content_type_created
ON analysis_chunks(content_type, created_at DESC)
WHERE content_type IS NOT NULL;Covering Indexes (Index-Only Scans)
Use for: Include all queried columns in the index.
-- Query: SELECT id, title FROM analyses WHERE status = 'complete' ORDER BY created_at DESC
CREATE INDEX idx_analyses_status_covering
ON analyses(status, created_at DESC)
INCLUDE (id, title);
-- PostgreSQL can satisfy entire query from index (no table access)Index Maintenance
Monitoring Index Usage
-- Unused indexes (candidates for removal)
SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND indexname NOT LIKE '%_pkey';
-- Index size
SELECT indexname, pg_size_pretty(pg_relation_size(indexname::regclass))
FROM pg_indexes
WHERE schemaname = 'public';Reindexing
-- Rebuild index (fixes bloat, updates statistics)
REINDEX INDEX CONCURRENTLY idx_chunks_vector_hnsw;
-- Rebuild all indexes on table
REINDEX TABLE CONCURRENTLY analysis_chunks;Anti-Patterns to Avoid
Over-Indexing
Problem: Every index slows down writes (INSERT/UPDATE/DELETE).
Rule: Only create indexes for actual query patterns. Remove unused indexes.
JSONB for Everything
Problem: Loss of type safety, no foreign keys, harder to query.
Rule: Use normalized tables for structured, queryable data. JSONB for flexible/opaque data.
Premature Denormalization
Problem: Complexity without proven performance need.
Rule: Start normalized. Denormalize only after profiling shows bottlenecks.
Missing Indexes on Foreign Keys
Problem: Slow joins and cascading deletes.
-- ALWAYS index foreign keys
CREATE TABLE artifacts (
id UUID PRIMARY KEY,
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE
);
-- REQUIRED for performance
CREATE INDEX idx_artifacts_analysis_id ON artifacts(analysis_id);Summary
| Pattern | Use When | Example |
|---|---|---|
| 3NF | Default for structured data | users, orders, products |
| Denormalize | Read-heavy, proven bottleneck | download_count, artifact_count |
| JSONB | Flexible schema, opaque data | extraction_metadata, API responses |
| Normalized Tables | Foreign keys, complex queries | analysis_chunks, agent_findings |
| B-Tree Index | Equality, ranges, sorting | status, created_at, foreign keys |
| GIN Index | Full-text, JSONB, arrays | search_vector, metadata |
| HNSW Index | Vector similarity (embeddings) | vector columns |
| Partial Index | Frequent filtered queries | WHERE status = 'complete' |
Golden Rule: Start normalized (3NF), measure performance, then selectively denormalize based on evidence.
Object Versioning
Database Object Versioning
Stored Procedure Versioning
-- Version tracking for stored procedures
CREATE TABLE procedure_versions (
id SERIAL PRIMARY KEY,
procedure_name VARCHAR(200) NOT NULL,
version VARCHAR(20) NOT NULL,
definition TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by VARCHAR(100),
is_current BOOLEAN DEFAULT TRUE,
CONSTRAINT uq_procedure_version UNIQUE (procedure_name, version)
);
-- Before updating a procedure, archive it
INSERT INTO procedure_versions (procedure_name, version, definition, created_by)
SELECT
'calculate_order_total',
'1.0.0',
pg_get_functiondef(oid),
current_user
FROM pg_proc
WHERE proname = 'calculate_order_total';View Versioning
-- Versioned views with _v1, _v2 naming
CREATE VIEW orders_summary_v1 AS
SELECT order_id, customer_id, total
FROM orders;
CREATE VIEW orders_summary_v2 AS
SELECT order_id, customer_id, total, shipping_cost, tax
FROM orders;
-- Current version alias
CREATE VIEW orders_summary AS
SELECT * FROM orders_summary_v2;Reference Data Versioning
"""Reference data migration with versioning."""
from alembic import op
from sqlalchemy.sql import table, column
import sqlalchemy as sa
revision = 'ref001'
status_codes = table(
'status_codes',
column('code', sa.String),
column('name', sa.String),
column('description', sa.String),
column('version', sa.Integer),
column('is_active', sa.Boolean)
)
def upgrade():
# Deactivate old version
op.execute(
status_codes.update()
.where(status_codes.c.version == 1)
.values(is_active=False)
)
# Insert new version
op.bulk_insert(status_codes, [
{'code': 'PENDING', 'name': 'Pending', 'version': 2, 'is_active': True},
{'code': 'PROCESSING', 'name': 'Processing', 'version': 2, 'is_active': True},
{'code': 'COMPLETED', 'name': 'Completed', 'version': 2, 'is_active': True},
{'code': 'FAILED', 'name': 'Failed', 'version': 2, 'is_active': True},
])
def downgrade():
op.execute(status_codes.delete().where(status_codes.c.version == 2))
op.execute(status_codes.update().where(status_codes.c.version == 1).values(is_active=True))Versioning Strategy
| Object Type | Strategy | Notes |
|---|---|---|
| Procedures | Version table | Archive before update |
| Views | _v1, _v2 naming | Alias for current |
| Reference data | Version column | Soft delete old |
Postgres Vs Mongodb
PostgreSQL vs MongoDB
Head-to-head comparison for the most common database decision. TL;DR: PostgreSQL wins for almost every use case.
Feature Comparison
| Feature | PostgreSQL | MongoDB |
|---|---|---|
| Data model | Relational + JSONB | Document (BSON) |
| Schema | Enforced (with flexible JSONB) | Schema-less (optional validation) |
| Transactions | Full ACID, multi-table | Multi-document (since 4.0, slower) |
| Joins | Native, optimized | $lookup (slow, limited) |
| JSON support | JSONB with GIN indexes | Native BSON |
| Full-text search | Built-in tsvector | Atlas Search (paid) |
| Geospatial | PostGIS (industry standard) | Built-in (basic) |
| Aggregation | SQL (window functions, CTEs) | Aggregation pipeline (verbose) |
| Replication | Streaming replication | Replica sets |
| Sharding | Citus extension / partitioning | Built-in (complex to operate) |
| License | PostgreSQL License (permissive) | SSPL (restrictive) |
| Hosting | Every cloud, many managed options | Atlas (MongoDB Inc.) or self-host |
PostgreSQL JSONB: The MongoDB Killer
PostgreSQL's JSONB type handles document workloads with full SQL power:
-- Store JSON documents
CREATE TABLE products (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
);
-- Index nested fields
CREATE INDEX idx_products_category ON products USING GIN ((data->'category'));
-- Query JSON with SQL
SELECT data->>'name' AS name, data->'price' AS price
FROM products
WHERE data @> '{"category": "electronics"}'
AND (data->>'price')::numeric < 500
ORDER BY (data->>'price')::numeric;
-- Combine relational joins WITH JSON queries
SELECT u.email, p.data->>'name'
FROM users u
JOIN products p ON p.data->>'seller_id' = u.id::text
WHERE p.data @> '{"in_stock": true}';Key advantages over MongoDB:
- ACID transactions across relational and JSON data in one query
- JOIN JSON documents with relational tables
- GIN indexes on JSONB are fast and flexible
- Partial indexes on JSON fields for targeted performance
- No vendor lock-in — SSPL license limits MongoDB hosting options
When MongoDB Actually Makes Sense
These cases are rare but legitimate:
-
Rapidly evolving schemas: Data shape changes multiple times per week, and you never need cross-document joins. Example: IoT with hundreds of device types each sending different telemetry shapes.
-
Existing MongoDB codebase: Migration cost exceeds benefit. The team has deep MongoDB operational expertise and the application works well.
-
Content catalogs with embedded data: Self-contained documents that are always read/written as a whole, never joined. Example: a product catalog where each product document contains all its variants, reviews, and metadata.
-
Time-series with variable schemas: Each data point has different fields. Note: TimescaleDB (PostgreSQL extension) often handles this better.
Common MongoDB Pitfalls
| Pitfall | What Happens | PostgreSQL Equivalent |
|---|---|---|
| No joins | Denormalize everything, data duplication | Native JOINs |
| Schema drift | Documents with inconsistent fields | Schema enforcement |
| $lookup performance | Cross-collection queries are slow | Optimized JOIN planner |
| Transaction overhead | Multi-doc transactions are 2-5x slower than single-doc | Negligible transaction overhead |
| SSPL license | Cannot offer as managed service | Permissive license |
| ObjectId ordering | Time-based, leaks creation timestamps | UUID v7 or SERIAL |
Migration: MongoDB to PostgreSQL
See references/migration-paths.md for detailed migration strategies.
Quick summary:
- Map collections to tables (or JSONB columns for truly flexible data)
- Extract commonly queried fields into typed columns
- Use
mongodump+ transformation scripts +COPYfor data migration - Rewrite aggregation pipelines as SQL queries (usually simpler)
Storage And Cms
Storage and CMS Database Selection
Guidance for choosing databases in content management and file storage contexts.
CMS Backend Database Selection
| CMS Type | Recommended Database | Rationale |
|---|---|---|
| Traditional CMS (WordPress-like) | PostgreSQL | Structured content with relationships, taxonomies, user roles |
| Headless CMS (API-first) | PostgreSQL | Content types, versioning, localization all benefit from relational model |
| Blog / Documentation | PostgreSQL or SQLite | Simple schema; SQLite works for single-author static-gen |
| E-commerce catalog | PostgreSQL | Products, variants, inventory, orders — heavily relational |
| User-generated content | PostgreSQL | Moderation workflows, reporting, search — needs JOINs and transactions |
Why Not MongoDB for CMS?
The "documents for content" intuition is wrong. CMS content is deeply relational:
- Content references other content (related posts, linked products)
- Taxonomies (categories, tags) are many-to-many relationships
- User roles and permissions are relational
- Content versioning needs transactions
- Localization multiplies content with locale relationships
PostgreSQL JSONB handles the flexible parts (custom fields, metadata) while maintaining relational integrity for the structural parts.
File and Blob Storage
Rule: Never store large files in a database.
| Storage Type | Use | Technology |
|---|---|---|
| File metadata | Database (PostgreSQL) | Store filename, size, mime type, S3 key, upload timestamp |
| Small files (< 1 MB) | Database acceptable | Profile avatars, thumbnails — BYTEA column or JSONB base64 |
| Medium files (1-100 MB) | Object storage | S3, GCS, R2, MinIO — store URL/key in database |
| Large files (100 MB+) | Object storage + multipart | S3 multipart upload, presigned URLs |
| Temporary files | Object storage with lifecycle | S3 lifecycle rules for auto-deletion |
Recommended Architecture
User Upload --> API Server --> Object Storage (S3/R2/GCS)
|
v
PostgreSQL (metadata only)
- file_id, s3_key, filename
- mime_type, size_bytes
- uploaded_by, uploaded_atThis pattern:
- Keeps database small and fast (metadata only)
- Leverages CDN for file delivery (CloudFront, Cloudflare)
- Enables presigned URLs for direct upload/download (bypasses API server)
- Works with any object storage provider (no vendor lock-in)
When to Use Database for Storage
Acceptable cases for storing data directly in PostgreSQL:
- Small, frequently accessed blobs: User avatars under 100 KB, stored as
BYTEA - Generated documents: PDF invoices, reports — if total volume is small (< 10 GB)
- Configuration files: YAML/JSON configs under 1 MB
- Embedded SQLite: Single-user apps where adding object storage is overkill
Object Storage Comparison
| Provider | Free Tier | Cost (per GB/mo) | Egress | Best For |
|---|---|---|---|---|
| Cloudflare R2 | 10 GB | $0.015 | Free | Cost-sensitive, no egress fees |
| AWS S3 | 5 GB (12 months) | $0.023 | $0.09/GB | AWS ecosystem, mature tooling |
| GCS | 5 GB | $0.020 | $0.12/GB | GCP ecosystem |
| MinIO | Unlimited (self-hosted) | Infrastructure cost | N/A | On-premise, air-gapped |
| Supabase Storage | 1 GB | $0.021 | Included in plan | Already using Supabase |
Default recommendation: Cloudflare R2 for new projects (zero egress fees), S3 for AWS-native stacks.
Checklists (2)
Migration Checklist
Migration Deployment Checklist
Verification steps for safe database migration deployment.
Pre-Deployment Checks
Code Review
- Migration file has descriptive docstring with purpose
-
revisionanddown_revisionare correct -
upgrade()contains all necessary changes -
downgrade()properly reverses all changes (tested!) - No hardcoded environment-specific values
- Large table operations use
CONCURRENTLYwhere applicable
Schema Validation
- Run
alembic check- no pending model changes - Generate SQL:
alembic upgrade head --sql > migration.sql - Review generated SQL for unexpected operations
- Verify column types match SQLAlchemy model definitions
- Check constraint names follow naming conventions
Backward Compatibility
- New columns are
nullable=Trueor haveserver_default - No column/table renames (use expand-contract pattern)
- No
NOT NULLconstraints added to existing columns with data - Application code works with both old and new schema
- API responses unchanged (or versioned)
Rollback Testing
Local Rollback Verification
# Apply migration
alembic upgrade head
# Verify schema change
psql -c "\d tablename"
# Rollback migration
alembic downgrade -1
# Verify rollback complete
psql -c "\d tablename"
# Re-apply to confirm idempotency
alembic upgrade headRollback Checklist
-
alembic downgrade -1succeeds without errors - Data is preserved after rollback (if applicable)
- Indexes and constraints are properly removed
- Triggers and functions are cleaned up
- Application functions correctly after rollback
Data Backup Verification
Before Production Migration
- Full database backup completed
- Backup verified (can restore to test environment)
- Point-in-time recovery configured (if using RDS/Cloud SQL)
- Backup retention policy confirmed (minimum 7 days)
- Document backup timestamp and location
Backup Commands
# PostgreSQL backup
pg_dump -Fc -v -h $DB_HOST -U $DB_USER -d $DB_NAME > backup_$(date +%Y%m%d_%H%M%S).dump
# Verify backup
pg_restore --list backup_*.dump | head -20
# Test restore to separate database
createdb restore_test
pg_restore -d restore_test backup_*.dumpProduction Deployment Steps
1. Pre-Flight (T-30 minutes)
- Notify team of upcoming migration window
- Verify backup completed successfully
- Check current migration version:
alembic current - Review migration history:
alembic history -v - Confirm rollback plan documented
2. Deployment Execution
# Generate SQL for final review
alembic upgrade head --sql > /tmp/migration_$(date +%Y%m%d).sql
# Review SQL one more time
cat /tmp/migration_$(date +%Y%m%d).sql
# Apply migration with timing
time alembic upgrade head
# Verify new version
alembic current3. Post-Migration Verification
- Check
alembic currentshows expected revision - Verify schema changes with
\d tablename - Run smoke tests against API endpoints
- Check application logs for database errors
- Monitor database metrics (connections, query latency)
- Verify no increase in error rates
4. Rollback Procedure (If Needed)
# Immediate rollback
alembic downgrade -1
# Verify rollback
alembic current
# Notify team of rollback
# Investigate and fix before retryLarge Table Migration Checklist
Additional Checks for Tables > 1M Rows
- Estimated migration duration calculated
-
CONCURRENTLYused for index operations - Batch processing implemented for data migrations
- Lock wait timeout configured:
SET lock_timeout = '5s' - Statement timeout configured:
SET statement_timeout = '30m' - Maintenance window scheduled (if blocking operations)
Monitoring During Migration
- Active queries:
SELECT * FROM pg_stat_activity WHERE state = 'active' - Lock monitoring:
SELECT * FROM pg_locks WHERE NOT granted - Table bloat after migration
- Replication lag (if applicable)
Emergency Contacts
| Role | Contact | Escalation |
|---|---|---|
| DBA On-Call | [Slack/Phone] | Database issues |
| Backend Lead | [Slack/Phone] | Application issues |
| Infrastructure | [Slack/Phone] | Connection/network |
Post-Deployment Tasks
- Update documentation if schema changed significantly
- Close related tickets/issues
- Schedule VACUUM ANALYZE if large changes
- Archive migration SQL for audit trail
- Confirm monitoring alerts are not firing
Schema Design Checklist
Database Schema Design Checklist
Use this checklist when designing or reviewing database schemas.
Pre-Design
- Requirements Gathered: Understand data entities and relationships
- Access Patterns Identified: Know how data will be queried
- SQL vs NoSQL Decision: Chosen appropriate database type
- Scale Estimate: Expected data volume and growth rate
- Read/Write Ratio: Understand if read-heavy or write-heavy
Normalization (SQL Databases)
- 1NF: Atomic values, no repeating groups
- 2NF: No partial dependencies on composite keys
- 3NF: No transitive dependencies
- Denormalization Justified: If denormalized, reason documented
Table Design
Primary Keys
- Primary Key Defined: Every table has primary key
- Key Type Chosen: INT auto-increment or UUID
- Meaningful Keys Avoided: Not using email/username as primary key
Data Types
- Appropriate Types: Correct data types for each column
- String Sizes: VARCHAR sized appropriately (not always 255)
- Numeric Precision: DECIMAL for money, INT for counts
- Dates in UTC: TIMESTAMP for datetime columns
- Boolean Type: Using BOOLEAN or TINYINT(1)
Constraints
- NOT NULL: Required columns marked NOT NULL
- Unique Constraints: Unique columns (email, username)
- Check Constraints: Validation rules (price >= 0)
- Default Values: Sensible defaults where appropriate
Relationships
Foreign Keys
- Foreign Keys Defined: All relationships have FK constraints
- ON DELETE Strategy: CASCADE, RESTRICT, SET NULL chosen appropriately
- ON UPDATE Strategy: Usually CASCADE
- Indexes on Foreign Keys: FKs are indexed
Relationship Types
- One-to-Many: Modeled correctly
- Many-to-Many: Junction table created
- Self-Referencing: Parent-child relationships handled
- Polymorphic: Strategy chosen (separate FKs or type+id)
Indexing
Index Strategy
- Primary Key Indexed: Automatic, verify
- Foreign Keys Indexed: All FKs have indexes
- WHERE Columns: Columns in WHERE clauses indexed
- ORDER BY Columns: Sort columns indexed
- Composite Indexes: Multi-column queries optimized
- Column Order: Most selective/queried column first
Index Types
- B-Tree: Used for ranges and equality (default)
- Hash: Used for exact matches only (if applicable)
- Full-Text: Used for text search (if needed)
- Partial Indexes: Conditional indexes (PostgreSQL)
Index Limits
- Not Over-Indexed: Only necessary indexes created
- Index Maintenance: Aware of write performance impact
Performance Considerations
Query Optimization
- Joins Optimized: N+1 queries avoided
- SELECT * Avoided: Only fetch needed columns
- Pagination: LIMIT/OFFSET or cursor-based
- Aggregations: Pre-calculated for expensive queries
Scalability
- Sharding Strategy: Planned for large datasets
- Partitioning: Tables partitioned by date/range (if applicable)
- Read Replicas: Planned for read-heavy workloads
- Caching Layer: Application-level caching considered
Data Integrity
Validation
- Database-Level Validation: Constraints enforce rules
- Application-Level Validation: Additional checks in code
- Foreign Key Constraints: Referential integrity enforced
- Unique Constraints: Duplicate prevention
Consistency
- Transactions: ACID properties for critical operations
- Cascading Deletes: Data cleanup strategy
- Soft Deletes: deleted_at column if needed
- Audit Trail: created_at, updated_at timestamps
Migrations
Migration Safety
- Backward Compatible: New columns nullable initially
- Up and Down Migrations: Rollback scripts provided
- Data Migrations Separate: Schema vs data changes separated
- Tested on Staging: Migrations tested on production copy
- Duration Estimated: Large migrations timed
Zero-Downtime
- Add Before Remove: New column added before removing old
- Gradual Rollout: Multi-step migrations for large changes
- Feature Flags: Large changes behind flags
Security
Access Control
- Principle of Least Privilege: Users have minimum permissions
- Separate Accounts: Read-only vs read-write accounts
- No Root Access: Application doesn't use root/admin account
Data Protection
- Sensitive Data Encrypted: Passwords hashed, PII encrypted
- SQL Injection Prevention: Parameterized queries only
- No Secrets in Code: Credentials in environment variables
NoSQL Considerations (if applicable)
Document Design
- Embed vs Reference: Strategy chosen based on access patterns
- Document Size: Within limits (16MB for MongoDB)
- Schema Validation: Validation rules defined (if supported)
- Indexes: Appropriate indexes on query fields
Scaling
- Sharding Key: Chosen for even distribution
- Replication: Read replicas configured
- Consistency Level: Chosen appropriately (eventual vs strong)
Documentation
- ERD Created: Entity-relationship diagram
- Schema Documented: Column descriptions and purpose
- Indexes Documented: Why each index exists
- Relationships Explained: Business logic behind relationships
- Migration History: Changelog of schema changes
Testing
- Sample Data: Test data created
- Query Performance: Slow queries identified (EXPLAIN)
- Load Testing: Performance under expected load
- Edge Cases: NULL, empty, max values tested
Common Pitfalls to Avoid
- ❌ Using VARCHAR(255) for everything
- ❌ No foreign key constraints
- ❌ Missing indexes on foreign keys
- ❌ Over-indexing (index on every column)
- ❌ FLOAT for money values
- ❌ Storing dates as strings
- ❌ No created_at/updated_at timestamps
- ❌ No soft delete strategy
- ❌ Non-reversible migrations
- ❌ Breaking changes without migration plan
Pre-Deployment Checklist
- Migrations Reviewed: Peer-reviewed by team
- Rollback Plan: Tested rollback procedures
- Monitoring Setup: Slow query logging enabled
- Backups: Backup before major schema changes
- Alerts Configured: Alerts for query performance degradation
Checklist Version: 1.0.0 Skill: database-schema-designer v1.0.0 Last Updated: 2025-10-31
Examples (2)
Alembic Examples
Alembic Migration Examples
Complete, production-ready migration examples.
Add Column with Default Value
"""Add organization_id to users with default.
Revision ID: add_org_001
Revises: previous_rev
Create Date: 2026-01-15
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = 'add_org_001'
down_revision = 'previous_rev'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add column as nullable first (no lock)
op.add_column('users',
sa.Column('organization_id', UUID(as_uuid=True), nullable=True)
)
# Add foreign key constraint
op.create_foreign_key(
'fk_users_organization',
'users', 'organizations',
['organization_id'], ['id'],
ondelete='SET NULL'
)
# Backfill with default organization (in separate transaction for large tables)
op.execute("""
UPDATE users
SET organization_id = (SELECT id FROM organizations WHERE is_default = true)
WHERE organization_id IS NULL
""")
def downgrade() -> None:
op.drop_constraint('fk_users_organization', 'users', type_='foreignkey')
op.drop_column('users', 'organization_id')Create Index CONCURRENTLY
"""Add composite index for user search queries.
Revision ID: idx_user_search
Revises: add_org_001
Create Date: 2026-01-15
Note: Uses CONCURRENTLY to avoid blocking reads/writes.
Migration must run outside transaction.
"""
from alembic import op
revision = 'idx_user_search'
down_revision = 'add_org_001'
branch_labels = None
depends_on = None
def upgrade() -> None:
# CRITICAL: Exit transaction for CONCURRENTLY
op.execute("COMMIT")
# Composite index for common query pattern
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org_status_created
ON users (organization_id, status, created_at DESC)
WHERE deleted_at IS NULL
""")
# GIN index for full-text search on name
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_name_trgm
ON users USING gin (name gin_trgm_ops)
""")
def downgrade() -> None:
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_name_trgm")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org_status_created")Data Migration with Batching
"""Migrate user preferences from JSON to normalized table.
Revision ID: migrate_prefs
Revises: idx_user_search
Create Date: 2026-01-15
Handles millions of rows with batch processing.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
import json
revision = 'migrate_prefs'
down_revision = 'idx_user_search'
branch_labels = None
depends_on = None
BATCH_SIZE = 5000
def upgrade() -> None:
# Create new preferences table
op.create_table('user_preferences',
sa.Column('id', UUID(as_uuid=True), primary_key=True,
server_default=sa.text('gen_random_uuid()')),
sa.Column('user_id', UUID(as_uuid=True), nullable=False),
sa.Column('key', sa.String(100), nullable=False),
sa.Column('value', sa.Text, nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True),
server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True),
server_default=sa.func.now()),
)
# Add constraints
op.create_foreign_key('fk_user_prefs_user', 'user_preferences', 'users',
['user_id'], ['id'], ondelete='CASCADE')
op.create_unique_constraint('uq_user_prefs_user_key', 'user_preferences',
['user_id', 'key'])
# Migrate data in batches
conn = op.get_bind()
offset = 0
while True:
# Fetch batch of users with preferences
result = conn.execute(sa.text("""
SELECT id, preferences
FROM users
WHERE preferences IS NOT NULL
AND preferences != '{}'::jsonb
ORDER BY id
LIMIT :batch_size OFFSET :offset
"""), {'batch_size': BATCH_SIZE, 'offset': offset})
rows = result.fetchall()
if not rows:
break
# Transform and insert preferences
for user_id, prefs in rows:
if prefs:
pref_dict = prefs if isinstance(prefs, dict) else json.loads(prefs)
for key, value in pref_dict.items():
conn.execute(sa.text("""
INSERT INTO user_preferences (user_id, key, value)
VALUES (:user_id, :key, :value)
ON CONFLICT (user_id, key) DO NOTHING
"""), {'user_id': user_id, 'key': key, 'value': str(value)})
conn.commit()
offset += BATCH_SIZE
print(f"Migrated {offset} users...")
# Create index after data load (faster than during inserts)
op.create_index('idx_user_prefs_user_id', 'user_preferences', ['user_id'])
def downgrade() -> None:
# Migrate data back to JSON column
conn = op.get_bind()
conn.execute(sa.text("""
UPDATE users u
SET preferences = (
SELECT jsonb_object_agg(key, value)
FROM user_preferences up
WHERE up.user_id = u.id
)
WHERE EXISTS (
SELECT 1 FROM user_preferences WHERE user_id = u.id
)
"""))
conn.commit()
op.drop_table('user_preferences')Enum Type Changes
"""Add new status values to user_status enum.
Revision ID: enum_status
Revises: migrate_prefs
Create Date: 2026-01-15
PostgreSQL enum modification requires special handling.
"""
from alembic import op
revision = 'enum_status'
down_revision = 'migrate_prefs'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add new values to existing enum
op.execute("ALTER TYPE user_status ADD VALUE IF NOT EXISTS 'suspended'")
op.execute("ALTER TYPE user_status ADD VALUE IF NOT EXISTS 'pending_verification'")
# Note: Cannot remove enum values in PostgreSQL
# For removal, must recreate enum type (see downgrade for pattern)
def downgrade() -> None:
# Recreate enum without new values (complex operation)
# First, update any rows using new values
op.execute("""
UPDATE users
SET status = 'inactive'
WHERE status IN ('suspended', 'pending_verification')
""")
# Rename old enum
op.execute("ALTER TYPE user_status RENAME TO user_status_old")
# Create new enum without new values
op.execute("CREATE TYPE user_status AS ENUM ('active', 'inactive', 'deleted')")
# Update column to use new enum
op.execute("""
ALTER TABLE users
ALTER COLUMN status TYPE user_status
USING status::text::user_status
""")
# Drop old enum
op.execute("DROP TYPE user_status_old")Add Table with Partitioning
"""Add partitioned events table for analytics.
Revision ID: events_partition
Revises: enum_status
Create Date: 2026-01-15
Uses PostgreSQL native partitioning for time-series data.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
revision = 'events_partition'
down_revision = 'enum_status'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create partitioned parent table
op.execute("""
CREATE TABLE events (
id UUID DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
user_id UUID,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at)
""")
# Create initial partitions (monthly)
op.execute("""
CREATE TABLE events_2026_01 PARTITION OF events
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01')
""")
op.execute("""
CREATE TABLE events_2026_02 PARTITION OF events
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
""")
op.execute("""
CREATE TABLE events_2026_03 PARTITION OF events
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
""")
# Create indexes on parent (propagates to partitions)
op.execute("COMMIT") # For CONCURRENTLY
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_user_type
ON events (user_id, event_type, created_at DESC)
""")
# Add FK constraint
op.create_foreign_key('fk_events_user', 'events', 'users',
['user_id'], ['id'], ondelete='SET NULL')
def downgrade() -> None:
op.drop_table('events_2026_03')
op.drop_table('events_2026_02')
op.drop_table('events_2026_01')
op.drop_table('events')Rename Column Safely (Expand-Contract)
"""Phase 1: Add email_address alongside email.
Revision ID: rename_email_p1
Revises: events_partition
Create Date: 2026-01-15
Safe column rename using expand-contract pattern.
Run Phase 2 after application code is updated.
"""
from alembic import op
import sqlalchemy as sa
revision = 'rename_email_p1'
down_revision = 'events_partition'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add new column
op.add_column('users', sa.Column('email_address', sa.String(255), nullable=True))
# Copy existing data
op.execute("UPDATE users SET email_address = email")
# Create sync trigger for transition period
op.execute("""
CREATE OR REPLACE FUNCTION sync_user_email()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.email IS DISTINCT FROM OLD.email THEN
NEW.email_address = NEW.email;
ELSIF NEW.email_address IS DISTINCT FROM OLD.email_address THEN
NEW.email = NEW.email_address;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_email
BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_email();
""")
def downgrade() -> None:
op.execute("DROP TRIGGER IF EXISTS trg_sync_email ON users")
op.execute("DROP FUNCTION IF EXISTS sync_user_email()")
op.drop_column('users', 'email_address')Orchestkit Database Schema
OrchestKit Database Schema
Overview
OrchestKit uses PostgreSQL with PGVector for storing technical content analysis results, embeddings, and artifacts. The schema is optimized for:
- Semantic search (vector similarity with HNSW indexing)
- Full-text search (PostgreSQL tsvector with GIN indexing)
- Hybrid search (RRF fusion of semantic + keyword results)
- Content chunking (hierarchical storage of document sections)
Core Tables
analyses - Content Analysis Records
Purpose: Stores URLs being analyzed, their content, embeddings, and processing status.
CREATE TABLE analyses (
-- Primary key
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- Required fields
url TEXT NOT NULL,
content_type VARCHAR(50) NOT NULL, -- 'article', 'video', 'repo'
status VARCHAR(50) NOT NULL DEFAULT 'pending', -- 'pending', 'processing', 'complete', 'failed'
-- Optional fields
title TEXT,
raw_content TEXT,
-- Vector embedding (OpenAI text-embedding-3-small: 1536 dimensions)
content_embedding VECTOR(1536),
-- Full-text search vector (auto-populated by trigger)
search_vector TSVECTOR,
-- Flexible metadata
extraction_metadata JSONB, -- {"fetch_time_ms": 1234, "charset": "utf-8"}
-- Context Engineering (Issue #244 - Handle Pattern)
content_summary TEXT, -- LLM-generated summary for lightweight state refs
content_sections JSONB, -- {"code_blocks": [...], "headings": [...], "word_count": 5000}
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);
-- GIN index for full-text search
CREATE INDEX idx_analyses_search_vector ON analyses USING GIN(search_vector);
-- HNSW index for vector similarity search
CREATE INDEX idx_analyses_embedding_hnsw
ON analyses
USING hnsw (content_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Partial index for common query (only completed analyses)
CREATE INDEX idx_analyses_completed
ON analyses(created_at DESC)
WHERE status = 'complete';
-- Trigger: Auto-update search_vector on INSERT/UPDATE
CREATE TRIGGER analyses_search_vector_trigger
BEFORE INSERT OR UPDATE OF title, url, raw_content ON analyses
FOR EACH ROW EXECUTE FUNCTION analyses_search_vector_update();Design Decisions:
- UUID Primary Key: Better for distributed systems, avoids sequential ID guessing
- JSONB for Metadata: Flexible schema for extraction metadata (fetch time, charset, etc.)
- Vector + TSVector: Dual search strategy (semantic + keyword)
- Partial Index: 90% of queries filter by
status = 'complete'
artifacts - Generated Implementation Guides
Purpose: Stores markdown documents generated from analysis results.
CREATE TABLE artifacts (
-- Primary key
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- Foreign key (cascade delete when analysis is deleted)
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
-- Required fields
markdown_content TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1 CHECK (version > 0),
-- Flexible metadata
artifact_metadata JSONB, -- {"topics": ["RAG", "LangGraph"], "complexity": "intermediate"}
-- Telemetry
download_count INTEGER NOT NULL DEFAULT 0 CHECK (download_count >= 0),
trace_id VARCHAR(255), -- Langfuse trace ID for observability
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_artifacts_analysis_id ON artifacts(analysis_id);
CREATE INDEX idx_artifacts_trace_id ON artifacts(trace_id);
-- GIN index for JSONB queries (e.g., find all artifacts with topic "RAG")
CREATE INDEX idx_artifacts_metadata_gin ON artifacts USING GIN(artifact_metadata);Design Decisions:
- CASCADE Delete: When an analysis is deleted, its artifacts are automatically removed
- Version Field: Track artifact revisions (future: regenerate guides)
- JSONB Metadata: Topics, tags, complexity stored as flexible JSON
- Denormalized Counter:
download_countfor performance (no JOINs needed)
Query Examples:
-- Find all RAG-related artifacts
SELECT * FROM artifacts
WHERE artifact_metadata @> '{"topics": ["RAG"]}'::jsonb;
-- Get most downloaded artifacts
SELECT id, artifact_metadata->>'complexity', download_count
FROM artifacts
ORDER BY download_count DESC
LIMIT 10;analysis_chunks - Chunk-Level Embeddings
Purpose: Store individual chunks of analyzed content with embeddings for granular semantic search.
CREATE TABLE analysis_chunks (
-- Primary key
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- Foreign key
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
-- Chunking metadata
granularity VARCHAR(20) NOT NULL CHECK (granularity IN ('coarse', 'fine', 'summary')),
path JSONB NOT NULL, -- ["section", "subsection", "chunk"]
section_title TEXT,
chunk_idx INTEGER NOT NULL CHECK (chunk_idx >= 0),
chunk_total INTEGER NOT NULL CHECK (chunk_total > 0),
-- Content metadata
content_type VARCHAR(50), -- Denormalized for filtering
language VARCHAR(20),
-- Deduplication
hash VARCHAR(128) NOT NULL, -- SHA256 hash of normalized content
-- Embedding metadata
model VARCHAR(100), -- 'text-embedding-3-small'
model_version VARCHAR(50),
-- Content preview
snippet TEXT, -- First ~200 chars
-- Vector embedding
vector VECTOR(1536) NOT NULL,
-- Full-text search vector (auto-populated by trigger)
content_tsvector TSVECTOR,
-- Telemetry fields
token_count INTEGER,
embedding_latency_ms FLOAT,
was_truncated BOOLEAN DEFAULT FALSE,
-- PII metadata (Issue #220)
pii_flag BOOLEAN NOT NULL DEFAULT FALSE,
pii_types JSONB, -- ["email", "phone_us"] (never actual PII values)
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
-- Table-level constraints
CONSTRAINT chk_chunk_idx_lt_total CHECK (chunk_idx < chunk_total)
);
-- Indexes
CREATE INDEX idx_chunks_analysis_id ON analysis_chunks(analysis_id);
CREATE INDEX idx_chunks_hash ON analysis_chunks(hash);
-- Composite indexes
CREATE INDEX idx_chunks_analysis_granularity ON analysis_chunks(analysis_id, granularity);
CREATE INDEX idx_chunks_hash_model ON analysis_chunks(hash, model, model_version);
-- GIN index for full-text search
CREATE INDEX idx_chunks_content_tsvector ON analysis_chunks USING GIN(content_tsvector);
-- HNSW index for vector similarity search
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Partial index (90% of chunks have content_type)
CREATE INDEX idx_chunks_content_type_created
ON analysis_chunks(content_type, created_at DESC)
WHERE content_type IS NOT NULL;
-- Triggers
CREATE TRIGGER chunks_tsvector_trigger
BEFORE INSERT OR UPDATE OF section_title, snippet ON analysis_chunks
FOR EACH ROW EXECUTE FUNCTION chunks_tsvector_update();
CREATE TRIGGER update_chunks_updated_at
BEFORE UPDATE ON analysis_chunks
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();Design Decisions:
- Granularity Levels:
coarse(sections),fine(paragraphs),summary(TL;DR) - Hierarchical Path: JSONB array for section navigation (
["section", "subsection"]) - Hash-Based Deduplication: Avoid re-embedding identical content
- Denormalized
content_type: Faster filtering without JOIN toanalyses - Telemetry Fields: Track token usage, latency for cost analysis
- PII Detection: Flag chunks with PII (email, phone) for audit compliance
Query Examples:
-- Semantic search within an analysis
SELECT id, section_title, snippet, 1 - (vector <=> :query_embedding) AS similarity
FROM analysis_chunks
WHERE analysis_id = :analysis_id
ORDER BY vector <=> :query_embedding
LIMIT 10;
-- Full-text search
SELECT id, section_title, ts_rank_cd(content_tsvector, query) AS rank
FROM analysis_chunks, to_tsquery('english', 'machine & learning') AS query
WHERE analysis_id = :analysis_id AND content_tsvector @@ query
ORDER BY rank DESC
LIMIT 10;
-- Hybrid search (RRF fusion) - see chunk_repository.py
-- Combines semantic + keyword results with reciprocal rank fusionagent_findings - Multi-Agent Analysis Results
Purpose: Store specialized agent outputs (Tech Comparator, Security Auditor, etc.).
CREATE TABLE agent_findings (
-- Primary key
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- Foreign key
analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
-- Agent metadata
agent_type VARCHAR(100) NOT NULL, -- 'tech_comparator', 'security_auditor', etc.
-- Findings (flexible schema)
findings JSONB NOT NULL,
-- Confidence score
confidence_score FLOAT,
-- Telemetry
processing_time_ms INTEGER,
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_agent_findings_analysis_id ON agent_findings(analysis_id);
CREATE INDEX idx_agent_findings_agent_type ON agent_findings(agent_type);Design Decisions:
- JSONB Findings: Each agent has different output structure (flexibility over schema rigidity)
- Agent Type Index: Fast filtering by agent type
- No Versioning: Findings are immutable (re-run analysis creates new record)
Supporting Tables
annotation_queue - Human-in-the-Loop Feedback
Purpose: Queue chunks/artifacts for human review and feedback collection.
CREATE TABLE annotation_queue (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
artifact_id UUID NOT NULL REFERENCES artifacts(id) ON DELETE CASCADE,
chunk_id UUID REFERENCES analysis_chunks(id) ON DELETE SET NULL,
annotation_type VARCHAR(50) NOT NULL, -- 'quality', 'relevance', 'accuracy'
status VARCHAR(50) NOT NULL DEFAULT 'pending',
feedback JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);agent_memory - Persistent Agent Memory
Purpose: Store agent learnings across sessions (RAG for agent improvements).
CREATE TABLE agent_memory (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
agent_type VARCHAR(100) NOT NULL,
memory_type VARCHAR(50) NOT NULL, -- 'success', 'failure', 'pattern'
content TEXT NOT NULL,
embedding VECTOR(1536),
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);Embedding Storage Pattern
Why PGVector?
- Co-location: Embeddings live alongside metadata (no separate vector DB)
- ACID Guarantees: Transactions ensure consistency
- Hybrid Search: Native support for keyword + semantic search
- Cost-Effective: No additional infrastructure (Redis, Pinecone, Weaviate)
HNSW Index Configuration
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);Parameters:
m = 16: Connections per layer (16 is good balance of speed/accuracy)ef_construction = 64: Build quality (higher = better recall, slower indexing)
Query-Time Tuning:
-- Increase search quality (default ef_search = 40)
SET hnsw.ef_search = 100;
-- Then run semantic search
SELECT * FROM analysis_chunks
ORDER BY vector <=> :query_embedding
LIMIT 10;Embedding Model Details
- Model: OpenAI
text-embedding-3-small - Dimensions: 1536
- Distance Metric: Cosine similarity (
vector_cosine_ops) - Normalization: Vectors are normalized (unit length) before storage
Full-Text Search Pattern
Weighted TSVector
-- Trigger function (analyses table)
CREATE FUNCTION analyses_search_vector_update() RETURNS TRIGGER AS $$
BEGIN
NEW.search_vector :=
setweight(to_tsvector('english', COALESCE(NEW.title, '')), 'A') || -- Highest weight
setweight(to_tsvector('english', COALESCE(NEW.url, '')), 'B') ||
setweight(to_tsvector('english', COALESCE(NEW.raw_content, '')), 'C');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;Weights:
A: Most important (title)B: Medium importance (URL)C: Lower importance (content)D: Lowest importance (not used)
Ranking Function
-- ts_rank_cd (cover density) - better for most use cases
SELECT id, ts_rank_cd(search_vector, query) AS rank
FROM analyses, to_tsquery('english', 'machine & learning') AS query
WHERE search_vector @@ query
ORDER BY rank DESC;Relationship Modeling
Cascade Deletes
All child tables use ON DELETE CASCADE to maintain referential integrity:
analyses (parent)
├── artifacts (ON DELETE CASCADE)
├── analysis_chunks (ON DELETE CASCADE)
└── agent_findings (ON DELETE CASCADE)
artifacts (parent)
└── annotation_queue (ON DELETE CASCADE)
analysis_chunks (parent)
└── annotation_queue (ON DELETE SET NULL) -- Optional referenceExample: Deleting an Analysis
-- Single DELETE cascades to all children
DELETE FROM analyses WHERE id = :analysis_id;
-- Automatically deletes:
-- - All artifacts for this analysis
-- - All chunks for this analysis
-- - All agent findings for this analysis
-- - All annotation queue entries for these artifacts/chunksMigration History
Key Migrations
-
e3c50d69e442_enable_pgvector.py- Enable PGVector extension
-
a37ac3b6a635_initial_schema.py- Create
analyses,artifacts,agent_findingstables
- Create
-
20251204091348_add_fulltext_search.py- Add
search_vectortoanalyses - Create GIN index + trigger for auto-update
- Add HNSW index for
content_embedding
- Add
-
20251209120000_add_analysis_chunks.py- Create
analysis_chunkstable
- Create
-
20251210_harden_embedding_pipeline.py- Add HNSW index on
analysis_chunks.vector - Add
content_tsvectorwith GIN index - Add telemetry fields (
token_count,embedding_latency_ms) - Add CHECK constraints (
chk_granularity,chk_chunk_idx_lt_total) - Add trigger for auto-updating
content_tsvector
- Add HNSW index on
-
20251210_add_cascade_delete.py- Update foreign keys to
ON DELETE CASCADE
- Update foreign keys to
-
20251210_add_pii_columns.py- Add
pii_flagandpii_typestoanalysis_chunks
- Add
-
20251218_backfill_chunks_tsvector.py- Backfill
content_tsvectorfor existing chunks (batched updates)
- Backfill
Performance Characteristics
Table Sizes (Golden Dataset)
- Analyses: 98 rows
- Artifacts: 98 rows
- Chunks: 415 rows
Index Sizes
- HNSW Indexes: ~2-3x vector data size
- GIN Indexes: ~40-60% of indexed text size
- B-Tree Indexes: ~30-40% of indexed column size
Query Performance
| Query Type | Latency (p95) | Notes |
|---|---|---|
| Semantic search (chunks) | <50ms | HNSW index with m=16 |
| Full-text search (chunks) | <30ms | GIN index on tsvector |
| Hybrid search (chunks) | <100ms | RRF fusion of both |
| Analysis by URL | <10ms | B-tree index on url |
Best Practices Applied
- Indexed All Foreign Keys: Every FK has a B-tree index for join performance
- Cascade Deletes: Parent-child relationships use
ON DELETE CASCADE - Triggers for Computed Fields:
updated_at,search_vector,content_tsvector - Partial Indexes: Filter common query patterns (
WHERE status = 'complete') - JSONB for Flexible Data: Metadata, findings, hierarchical paths
- Normalized Tables for Structured Data: Chunks, artifacts, findings
- Denormalization for Performance:
download_count,content_typein chunks - Telemetry Fields: Track token usage, latency, PII detection
- CHECK Constraints: Data integrity (
granularity,chunk_idx < chunk_total) - UUID Primary Keys: Better for distributed systems, no sequential guessing
Future Enhancements
- Materialized Views: Pre-computed aggregates for dashboards
- Partitioning: Partition
analysis_chunksby analysis_id (when >10M rows) - Read Replicas: Scale read queries across multiple PostgreSQL instances
- Connection Pooling: PgBouncer for reduced connection overhead
- Archive Old Analyses: Move completed analyses >90 days to cold storage
Create Pr
Creates GitHub pull requests with validation. Use when opening PRs or submitting code for review.
Demo Producer
Creates polished demo videos for skills, tutorials, and CLI demonstrations. Use when producing video showcases, marketing content, or terminal recordings.
Last updated on