Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Database Patterns

Database design and migration patterns for Alembic migrations, schema design (SQL/NoSQL), and database versioning. Use when creating migrations, designing schemas, normalizing data, managing database versions, or handling schema drift.

Reference medium

Primary Agent: database-engineer

Database Patterns

Comprehensive patterns for database migrations, schema design, and version management. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Alembic Migrations3CRITICALAutogenerate, data migrations, branch management
Schema Design3HIGHNormalization, indexing strategies, NoSQL patterns
Versioning3HIGHChangelogs, rollback plans, schema drift detection
Zero-Downtime Migration2CRITICALExpand-contract, pgroll, rollback monitoring

| Database Selection | 1 | HIGH | Choosing the right database, PostgreSQL vs MongoDB, cost analysis |

Total: 12 rules across 5 categories

Quick Start

# Alembic: Auto-generate migration from model changes
# alembic revision --autogenerate -m "add user preferences"

def upgrade() -> None:
    op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))
    op.execute("UPDATE users SET org_id = 'default-org-uuid' WHERE org_id IS NULL")

def downgrade() -> None:
    op.drop_column('users', 'org_id')
-- Schema: Normalization to 3NF with proper indexing
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id UUID NOT NULL REFERENCES customers(id),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);

Alembic Migrations

Migration management with Alembic for SQLAlchemy 2.0 async applications.

RuleFileKey Pattern
Autogeneraterules/alembic-autogenerate.mdAuto-generate from models, async env.py, review workflow
Data Migrationrules/alembic-data-migration.mdBatch backfill, two-phase NOT NULL, zero-downtime
Branchingrules/alembic-branching.mdFeature branches, merge migrations, conflict resolution

Schema Design

SQL and NoSQL schema design with normalization, indexing, and constraint patterns.

RuleFileKey Pattern
Normalizationrules/schema-normalization.md1NF-3NF, when to denormalize, JSON vs normalized
Indexingrules/schema-indexing.mdB-tree, GIN, HNSW, partial/covering indexes
NoSQL Patternsrules/schema-nosql.mdEmbed vs reference, document design, sharding

Versioning

Database version control and change management across environments.

RuleFileKey Pattern
Changelogrules/versioning-changelog.mdSchema version table, semantic versioning, audit trails
Rollbackrules/versioning-rollback.mdRollback testing, destructive rollback docs, CI verification
Drift Detectionrules/versioning-drift.mdEnvironment sync, checksum verification, migration locks

Database Selection

Decision frameworks for choosing the right database. Default: PostgreSQL.

RuleFileKey Pattern
Selection Guiderules/db-selection.mdPostgreSQL-first, tier-based matrix, anti-patterns

Key Decisions

DecisionRecommendationRationale
Async dialectpostgresql+asyncpgNative async support for SQLAlchemy 2.0
NOT NULL columnTwo-phase: nullable first, then alterAvoids locking, backward compatible
Large table indexCREATE INDEX CONCURRENTLYZero-downtime, no table locks
Normalization target3NF for OLTPReduces redundancy while maintaining query performance
Primary key strategyUUID for distributed, INT for single-DBContext-appropriate key generation
Soft deletesdeleted_at timestamp columnPreserves audit trail, enables recovery
Migration granularityOne logical change per fileEasier rollback and debugging
Production deploymentGenerate SQL, review, then applyNever auto-run in production

Anti-Patterns (FORBIDDEN)

# NEVER: Add NOT NULL without default or two-phase approach
op.add_column('users', sa.Column('org_id', UUID, nullable=False))  # LOCKS TABLE!

# NEVER: Use blocking index creation on large tables
op.create_index('idx_large', 'big_table', ['col'])  # Use CONCURRENTLY

# NEVER: Skip downgrade implementation
def downgrade():
    pass  # WRONG - implement proper rollback

# NEVER: Modify migration after deployment - create new migration instead

# NEVER: Run migrations automatically in production
# Use: alembic upgrade head --sql > review.sql

# NEVER: Run CONCURRENTLY inside transaction
op.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;")  # FAILS

# NEVER: Delete migration history
command.stamp(alembic_config, "head")  # Loses history

# NEVER: Skip environments (Always: local -> CI -> staging -> production)

Detailed Documentation

ResourceDescription
references/Advanced patterns: Alembic, normalization, migration, audit, environment, versioning
checklists/Migration deployment and schema design checklists
examples/Complete migration examples, schema examples
scripts/Migration templates, model change detector

Zero-Downtime Migration

Safe database schema changes without downtime using expand-contract pattern and online schema changes.

RuleFileKey Pattern
Expand-Contractrules/migration-zero-downtime.mdExpand phase, backfill, contract phase, pgroll automation
Rollback & Monitoringrules/migration-rollback.mdpgroll rollback, lock monitoring, replication lag, backfill progress
  • sqlalchemy-2-async - Async SQLAlchemy session patterns
  • ork:testing-patterns - Comprehensive testing patterns including migration testing
  • caching - Cache layer design to complement database performance
  • ork:performance - Performance optimization patterns

Rules (12)

Configure Alembic autogenerate for safe schema migration from model changes — CRITICAL

Alembic Autogenerate Migrations

Async env.py Configuration

# migrations/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# Import your models' Base for autogenerate
from app.models.base import Base

config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url, target_metadata=target_metadata,
        literal_binds=True, dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.", poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def run_migrations_online() -> None:
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Generate and Apply

# Generate from model changes
alembic revision --autogenerate -m "add user preferences"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

# Generate SQL for review (production)
alembic upgrade head --sql > migration.sql

# Check current revision
alembic current

# Show migration history
alembic history --verbose

Migration Template

"""Add users table.

Revision ID: abc123
Revises: None
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

revision = 'abc123'
down_revision = None

def upgrade() -> None:
    op.create_table('users',
        sa.Column('id', UUID(as_uuid=True), primary_key=True),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    op.create_index('idx_users_email', 'users', ['email'], unique=True)

def downgrade() -> None:
    op.drop_index('idx_users_email', table_name='users')
    op.drop_table('users')

Key Decisions

DecisionRecommendationRationale
Always review generated SQLMay miss custom constraintsAutogenerate is a starting point
One logical change per fileEasier rollback and debuggingAtomic migration units
Always implement downgradeEnsures reversibilityRequired for safe rollback
Use postgresql+asyncpgNative async supportSQLAlchemy 2.0 standard

Incorrect — Missing target_metadata:

# env.py without target_metadata
from alembic import context

target_metadata = None  # Autogenerate won't work!

def run_migrations_online():
    context.configure(target_metadata=target_metadata)

Correct — Import all models via Base:

# env.py with proper metadata import
from app.models.base import Base  # Imports all SQLAlchemy models
from alembic import context

target_metadata = Base.metadata  # Autogenerate detects changes

def run_migrations_online():
    context.configure(target_metadata=target_metadata)

Common Mistakes

  • Skipping review of autogenerated migrations
  • Running autogenerate without importing all models
  • Using asyncio.run() when event loop already exists (FastAPI lifespan)
  • Missing target_metadata configuration

Handle Alembic migration branch conflicts with proper merge strategies — CRITICAL

Alembic Branching & Merge Patterns

Creating Feature Branches

# Create a feature branch
alembic revision --branch-label=feature_payments -m "start payments feature"

# Create revision on branch
alembic revision --head=feature_payments@head -m "add payment_methods table"

# View branch structure
alembic branches

# Merge branches before deployment
alembic merge feature_payments@head main@head -m "merge payments feature"

Merge Migration

"""Merge feature_payments and main branches.

Revision ID: merge_abc
Revises: ('abc123', 'def456')
"""
revision = 'merge_abc'
down_revision = ('abc123', 'def456')  # Tuple for merge

def upgrade() -> None:
    pass  # No operations - marks merge point

def downgrade() -> None:
    raise Exception("Cannot downgrade past merge point")

Multi-Database Migrations

# alembic/env.py - Multi-database support
DATABASES = {
    'default': 'postgresql://user:pass@localhost/main',
    'analytics': 'postgresql://user:pass@localhost/analytics',
}

def run_migrations_online():
    for db_name, url in DATABASES.items():
        config = context.config
        config.set_main_option('sqlalchemy.url', url)
        connectable = engine_from_config(
            config.get_section(config.config_ini_section),
            prefix='sqlalchemy.', poolclass=pool.NullPool,
        )
        with connectable.connect() as connection:
            context.configure(
                connection=connection,
                target_metadata=get_metadata(db_name),
                version_table=f'alembic_version_{db_name}',
            )
            with context.begin_transaction():
                context.run_migrations()

Column Rename (Expand-Contract)

"""Phase 1: Add new column alongside old."""
def upgrade() -> None:
    op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))

    # Create trigger to sync during transition
    op.execute("""
        CREATE OR REPLACE FUNCTION sync_user_name()
        RETURNS TRIGGER AS $$
        BEGIN
            NEW.full_name = COALESCE(NEW.full_name, NEW.name);
            NEW.name = COALESCE(NEW.name, NEW.full_name);
            RETURN NEW;
        END; $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_sync_user_name
        BEFORE INSERT OR UPDATE ON users
        FOR EACH ROW EXECUTE FUNCTION sync_user_name();
    """)

def downgrade() -> None:
    op.execute("DROP TRIGGER IF EXISTS trg_sync_user_name ON users")
    op.execute("DROP FUNCTION IF EXISTS sync_user_name()")
    op.drop_column('users', 'full_name')

Key Decisions

DecisionRecommendationRationale
Column rename4-phase expand/contractSafe migration without downtime
Branch mergeMerge before deploymentPrevents version conflicts
Multi-databaseSeparate version tablesIndependent migration tracking
Transaction modeDefault on, disable for CONCURRENTLYCONCURRENTLY requires no transaction

Incorrect — Immediate column rename:

# Causes downtime - app breaks immediately
def upgrade():
    op.alter_column('users', 'name', new_column_name='full_name')

Correct — Expand-contract pattern:

# Phase 1: Add new column, sync with trigger
def upgrade():
    op.add_column('users', sa.Column('full_name', sa.String(255)))
    op.execute("""
        CREATE TRIGGER trg_sync_user_name
        BEFORE INSERT OR UPDATE ON users
        FOR EACH ROW EXECUTE FUNCTION sync_user_name()
    """)
# Phase 2 (separate migration): Drop old column after app updated

Common Mistakes

  • Merging branches without testing both paths first
  • Skipping expand-contract for column renames (causes downtime)
  • Using shared version table for multiple databases
  • Not using --branch-label for feature isolation

Run Alembic data migrations safely with batch processing and two-phase approaches — CRITICAL

Alembic Data Migration Patterns

Two-Phase NOT NULL Migration

"""Add org_id column (phase 1 - nullable).

Phase 1: Add nullable column
Phase 2: Backfill data
Phase 3: Add NOT NULL (separate migration after verification)
"""
def upgrade() -> None:
    # Phase 1: Add as nullable first
    op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))

    # Phase 2: Backfill with default org
    op.execute("""
        UPDATE users SET org_id = 'default-org-uuid' WHERE org_id IS NULL
    """)

    # Phase 3 in SEPARATE migration after app updated:
    # op.alter_column('users', 'org_id', nullable=False)

def downgrade() -> None:
    op.drop_column('users', 'org_id')

Batch Processing Pattern

BATCH_SIZE = 1000

def upgrade() -> None:
    op.add_column('users', sa.Column('status', sa.String(20), nullable=True))

    conn = op.get_bind()
    total_updated = 0

    while True:
        result = conn.execute(sa.text("""
            SELECT id FROM users
            WHERE status IS NULL
            LIMIT :batch_size
            FOR UPDATE SKIP LOCKED
        """), {'batch_size': BATCH_SIZE})

        ids = [row[0] for row in result]
        if not ids:
            break

        conn.execute(sa.text("""
            UPDATE users
            SET status = CASE WHEN is_active THEN 'active' ELSE 'inactive' END
            WHERE id = ANY(:ids)
        """), {'ids': ids})

        total_updated += len(ids)
        conn.commit()  # Commit per batch to release locks

def downgrade() -> None:
    op.drop_column('users', 'status')

Concurrent Index (Zero-Downtime)

def upgrade() -> None:
    # CONCURRENTLY avoids table locks on large tables
    # IMPORTANT: Cannot run inside transaction block
    op.execute("COMMIT")
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
        ON users (organization_id, created_at DESC)
    """)

def downgrade() -> None:
    op.execute("COMMIT")
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")

Running Async Code in Migrations

from sqlalchemy.util import await_only

def upgrade() -> None:
    connection = op.get_bind()
    # Alembic runs in greenlet context, so await_only works
    result = await_only(
        connection.execute(text("SELECT count(*) FROM users"))
    )

Backfill Size Guide

Table SizeStrategyNotes
< 10K rowsSingle UPDATEFast enough
10K-1M rowsBatched UPDATE in migrationLIMIT + commit per batch
> 1M rowsBackground script + triggerTrigger for new rows, script for old

Incorrect — Single-phase NOT NULL:

# Locks table for entire backfill duration
def upgrade():
    op.add_column('users', sa.Column('org_id', UUID, nullable=False))
    op.execute("UPDATE users SET org_id = 'default-uuid'")

Correct — Two-phase NOT NULL:

# Phase 1: Add as nullable, backfill
def upgrade():
    op.add_column('users', sa.Column('org_id', UUID, nullable=True))
    op.execute("UPDATE users SET org_id = 'default-uuid' WHERE org_id IS NULL")
# Phase 2 (separate migration): Add NOT NULL after verification

Common Mistakes

  • Adding NOT NULL without two-phase approach (locks entire table)
  • Using blocking index creation on large tables (use CONCURRENTLY)
  • Running CONCURRENTLY inside a transaction block (fails)
  • Not committing between batches (holds locks too long)
  • Skipping FOR UPDATE SKIP LOCKED in batch queries (deadlocks)

Select the right database engine based on workload requirements and trade-offs — HIGH

Database Selection Guide

The Default: PostgreSQL

PostgreSQL is the #1 most-loved database for good reason. Start with PostgreSQL unless you have a specific, validated reason not to.

-- PostgreSQL handles "document store" workloads with JSONB
CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_products_metadata ON products USING GIN (metadata);
SELECT * FROM products WHERE metadata @> '{"category": "electronics"}';

Key strengths: JSONB (90% of MongoDB use cases), full-text search (tsvector), extensions (PostGIS, pgvector, TimescaleDB), ACID transactions, universal ecosystem support.

Decision Matrix by Project Tier

TierRecommendationRationale
Interview / Take-homeSQLite or PostgreSQLZero config or standard choice
Hackathon / PrototypePostgreSQLDon't waste time on exotic choices
MVP (< 6 months)PostgreSQLOne database, learn it well
Growth (1-5 engineers)PostgreSQL + Redis (cache)Add Redis only when measured
Enterprise (5+)PostgreSQL primary + purpose-built secondariesSpecialized stores for validated bottlenecks

Decision Matrix by Data Model

Data ShapeBest FitWhy
Relational with joinsPostgreSQLBuilt for this
JSON documents with queriesPostgreSQL (JSONB)Indexed JSON with SQL power
Truly schema-less, evolving weeklyMongoDBOnly if you never join
Key-value lookupsRedisSub-ms reads, ephemeral data
Time-series metricsPostgreSQL + TimescaleDBOr InfluxDB for extreme scale
Vector embeddingsPostgreSQL + pgvectorOr Pinecone for 100M+ vectors

When to Use Each

  • PostgreSQL: Everything, unless proven otherwise. Web apps, APIs, SaaS, complex queries, JSON, full-text search, geo, vectors.
  • MongoDB: Truly document-shaped data with no relational needs. Content where schema changes weekly AND you never join.
  • Redis: Caching, sessions, ephemeral data. Never as primary datastore.
  • SQLite: Embedded, single-user, dev/testing, edge computing. Never for concurrent multi-user writes.

Anti-Patterns

Incorrect:

  • Choosing MongoDB because "it's trendy" or "JSON is easier" without evaluating PostgreSQL JSONB
  • Premature sharding before exhausting indexing, query optimization, read replicas, connection pooling
  • SQLite in production with concurrent writes (causes SQLITE_BUSY)
  • Redis as primary datastore (data persistence is best-effort)
  • Running PostgreSQL + MongoDB + Redis + Elasticsearch when PostgreSQL alone covers it

Correct:

  • Default to PostgreSQL, add specialized stores only for validated bottlenecks
  • Before choosing non-PostgreSQL: benchmark (not assume), verify data model incompatibility, confirm team expertise
  • Use Redis only as cache layer in front of PostgreSQL
  • Use SQLite only for embedded/single-user/dev

References

  • references/postgres-vs-mongodb.md — Head-to-head comparison, JSONB examples
  • references/cost-comparison.md — Hosting costs, license, operational complexity
  • references/db-migration-paths.md — MongoDB→PostgreSQL, SQLite→PostgreSQL strategies
  • references/storage-and-cms.md — CMS backends, file/blob storage decisions

Plan migration rollbacks and monitor execution to prevent extended outages — HIGH

Migration Rollback and Monitoring

Incorrect — no rollback plan or monitoring:

-- FORBIDDEN: Constraint validation in same transaction as creation
ALTER TABLE orders ADD CONSTRAINT fk_org
FOREIGN KEY (org_id) REFERENCES orgs(id);
-- Impact: Full table scan with exclusive lock

-- FORBIDDEN: Backfill without batching
UPDATE users SET new_col = old_col;
-- Impact: Locks entire table, fills transaction log

-- FORBIDDEN: Skip environments
-- Always: local -> CI -> staging -> production

Correct — pgroll automated rollback:

# Install pgroll
brew install xataio/pgroll/pgroll

# Initialize pgroll in your database
pgroll init --postgres-url "postgres://user:pass@localhost/db"
{
  "name": "001_add_email_verified",
  "operations": [
    {
      "add_column": {
        "table": "users",
        "column": {
          "name": "email_verified",
          "type": "boolean",
          "default": "false",
          "nullable": false
        },
        "up": "false"
      }
    }
  ]
}
# Start migration (creates versioned schema)
pgroll start migrations/001_add_email_verified.json

# After verification, complete migration
pgroll complete

# Rollback if issues
pgroll rollback

Monitoring During Migration

-- Check for locks during migration
SELECT pid, now() - pg_stat_activity.query_start AS duration, query, state
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
AND state != 'idle';

-- Check replication lag (if using replicas)
SELECT client_addr, state, (sent_lsn - replay_lsn) AS replication_lag
FROM pg_stat_replication;

-- Monitor backfill progress
SELECT
  COUNT(*) FILTER (WHERE display_name IS NOT NULL) as migrated,
  COUNT(*) FILTER (WHERE display_name IS NULL) as remaining,
  ROUND(100.0 * COUNT(*) FILTER (WHERE display_name IS NOT NULL) / COUNT(*), 2) as pct_complete
FROM users;

Key Decisions

DecisionRecommendation
Automated rollbackUse pgroll for dual-schema versioning
VerificationCheck pg_stat_statements before contract phase
Lock monitoringQuery pg_stat_activity during migration
ReplicationMonitor lag before completing migration
Environment orderlocal -> CI -> staging -> production (never skip)

Apply zero-downtime migration patterns to avoid table locks and production outages — CRITICAL

Zero-Downtime Migration Patterns

Incorrect — blocking schema changes:

-- FORBIDDEN: Single-step ALTER that locks table
ALTER TABLE users RENAME COLUMN name TO full_name;
-- Impact: Blocks ALL queries during metadata lock

-- FORBIDDEN: Add NOT NULL to existing column directly
ALTER TABLE orders ADD COLUMN org_id UUID NOT NULL;
-- Impact: Fails immediately if table has data

-- FORBIDDEN: Regular CREATE INDEX on large table
CREATE INDEX idx_big_table_col ON big_table(col);
-- Impact: Locks table for minutes/hours

-- FORBIDDEN: Drop column without verification period
ALTER TABLE users DROP COLUMN legacy_field;
-- Impact: No rollback if application still references it

Correct — expand-contract pattern:

Phase 1: EXPAND              Phase 2: MIGRATE           Phase 3: CONTRACT
Add new column               Backfill data              Remove old column
(nullable)                   Update app to use new      (after app migrated)
                             Both versions work

Manual Expand Phase

-- Step 1: Add new column (nullable, no default constraint yet)
ALTER TABLE users ADD COLUMN display_name VARCHAR(200);

-- Step 2: Create trigger for dual-write (if app can't dual-write)
CREATE OR REPLACE FUNCTION sync_display_name() RETURNS TRIGGER AS $$
BEGIN
  NEW.display_name := CONCAT(NEW.first_name, ' ', NEW.last_name);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_sync_display_name
  BEFORE INSERT OR UPDATE ON users
  FOR EACH ROW EXECUTE FUNCTION sync_display_name();

-- Step 3: Backfill existing data (in batches)
UPDATE users SET display_name = CONCAT(first_name, ' ', last_name)
WHERE display_name IS NULL
AND id IN (SELECT id FROM users WHERE display_name IS NULL LIMIT 1000);

Manual Contract Phase

-- Step 1: Verify no readers of old column
SELECT * FROM pg_stat_statements
WHERE query LIKE '%first_name%' OR query LIKE '%last_name%';

-- Step 2: Drop trigger, then old columns ONLY after app fully migrated
DROP TRIGGER IF EXISTS trg_sync_display_name ON users;
ALTER TABLE users DROP COLUMN first_name;
ALTER TABLE users DROP COLUMN last_name;
ALTER TABLE users ALTER COLUMN display_name SET NOT NULL;

NOT VALID Constraint Pattern

-- Step 1: Add constraint without validating existing rows (instant)
ALTER TABLE orders ADD CONSTRAINT chk_amount_positive
CHECK (amount > 0) NOT VALID;

-- Step 2: Validate constraint (scans table but allows writes)
ALTER TABLE orders VALIDATE CONSTRAINT chk_amount_positive;

Key Decisions

DecisionRecommendation
Tool choicepgroll for automation, manual for simple cases
Column renameAdd new + copy + drop old (never RENAME)
Constraint timingAdd NOT VALID first, VALIDATE separately
Rollback windowKeep old schema 24-72 hours
Backfill batch size1000-10000 rows per batch
Index strategyCONCURRENTLY always

Design database indexes to optimize query performance without slowing writes — HIGH

Schema Indexing Strategies

When to Create Indexes

-- Index foreign keys (required for join/cascade performance)
CREATE INDEX idx_orders_customer_id ON orders(customer_id);

-- Index columns in WHERE clauses
CREATE INDEX idx_users_email ON users(email);

-- Index ORDER BY / GROUP BY columns
CREATE INDEX idx_orders_created_at ON orders(created_at);

-- Composite index for multi-column queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);

Composite Index Column Order

-- Good: Supports both queries
CREATE INDEX idx_orders_customer_status ON orders(customer_id, status);

-- Uses index: WHERE customer_id = 123 AND status = 'pending'
-- Uses index: WHERE customer_id = 123 (leftmost prefix)
-- Does NOT use index: WHERE status = 'pending' (not leftmost)

Rule: Put most selective column first, or most frequently queried alone.

Index Types

B-Tree (Default)

Equality, range queries, sorting.

CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);

GIN (Inverted Index)

Full-text search (TSVECTOR), JSONB, arrays.

CREATE INDEX idx_analyses_search_vector ON analyses USING GIN(search_vector);
CREATE INDEX idx_artifact_metadata_gin ON artifacts USING GIN(artifact_metadata);

HNSW (Vector Similarity)

Approximate nearest neighbor search (embeddings).

CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- m=16: connections per layer | ef_construction=64: build quality

Partial Indexes

Filter frequently queried subsets.

-- Only index completed analyses (common query)
CREATE INDEX idx_analyses_completed ON analyses(created_at DESC) WHERE status = 'complete';

Covering Indexes (Index-Only Scans)

Include all queried columns.

CREATE INDEX idx_analyses_status_covering
ON analyses(status, created_at DESC) INCLUDE (id, title);
-- PostgreSQL can satisfy query entirely from index

Index Maintenance

-- Find unused indexes (candidates for removal)
SELECT indexname, idx_scan FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND indexname NOT LIKE '%_pkey';

-- Rebuild bloated indexes
REINDEX INDEX CONCURRENTLY idx_chunks_vector_hnsw;

Constraints as Validation

CREATE TABLE products (
  id INT PRIMARY KEY,
  price DECIMAL(10, 2) CHECK (price >= 0),
  stock INT CHECK (stock >= 0),
  discount_percent INT CHECK (discount_percent BETWEEN 0 AND 100)
);

Incorrect — Missing foreign key index:

-- FK without index causes slow joins
CREATE TABLE orders (
  id UUID PRIMARY KEY,
  customer_id UUID REFERENCES customers(id)
  -- Missing: CREATE INDEX idx_orders_customer ON orders(customer_id)
);

Correct — Index all foreign keys:

-- Index enables fast joins and cascade deletes
CREATE TABLE orders (
  id UUID PRIMARY KEY,
  customer_id UUID REFERENCES customers(id)
);
CREATE INDEX idx_orders_customer ON orders(customer_id);

Anti-Patterns

  • Over-indexing: Every index slows writes. Only index actual query patterns.
  • Missing FK indexes: Causes slow joins and cascading deletes.
  • Wrong column order: Composite indexes only use leftmost prefix.
  • FLOAT for money: Use DECIMAL(10, 2) for financial values.

Apply normalization rules to eliminate data redundancy and update anomalies — HIGH

Schema Normalization Patterns

Normal Forms

1st Normal Form (1NF)

Each column contains atomic values, no repeating groups.

-- WRONG: Multiple values in one column
CREATE TABLE orders (
  id INT PRIMARY KEY,
  product_ids VARCHAR(255)  -- '101,102,103' (bad!)
);

-- CORRECT: Separate junction table
CREATE TABLE orders (id INT PRIMARY KEY, customer_id INT);
CREATE TABLE order_items (
  id INT PRIMARY KEY,
  order_id INT REFERENCES orders(id),
  product_id INT
);

2nd Normal Form (2NF)

All non-key columns depend on the entire primary key.

-- WRONG: order_date depends only on order_id
CREATE TABLE order_items (
  order_id UUID, product_id UUID,
  order_date TIMESTAMP,  -- Partial dependency!
  PRIMARY KEY (order_id, product_id)
);

-- CORRECT: Separate tables
CREATE TABLE orders (id UUID PRIMARY KEY, order_date TIMESTAMP NOT NULL);
CREATE TABLE order_items (
  id UUID PRIMARY KEY,
  order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
  product_id UUID NOT NULL, quantity INTEGER NOT NULL CHECK (quantity > 0)
);

3rd Normal Form (3NF)

No transitive dependencies (non-key columns depend only on primary key).

-- WRONG: country_name depends on country_code
CREATE TABLE users (id UUID PRIMARY KEY, country_code TEXT, country_name TEXT);

-- CORRECT: Extract to separate table
CREATE TABLE countries (code TEXT PRIMARY KEY, name TEXT NOT NULL UNIQUE);
CREATE TABLE users (id UUID PRIMARY KEY, country_code TEXT REFERENCES countries(code));

When to Denormalize

Denormalize only after profiling shows bottlenecks.

-- Denormalized counter (faster reads)
CREATE TABLE analyses (
  id UUID PRIMARY KEY,
  artifact_count INTEGER DEFAULT 0  -- Maintained by trigger
);

CREATE FUNCTION update_artifact_count() RETURNS TRIGGER AS $$
BEGIN
  IF TG_OP = 'INSERT' THEN
    UPDATE analyses SET artifact_count = artifact_count + 1 WHERE id = NEW.analysis_id;
  ELSIF TG_OP = 'DELETE' THEN
    UPDATE analyses SET artifact_count = artifact_count - 1 WHERE id = OLD.analysis_id;
  END IF;
  RETURN NULL;
END; $$ LANGUAGE plpgsql;

JSON vs Normalized Tables

Use JSON WhenUse Normalized When
Schema is flexible/evolvingNeed foreign key constraints
Data rarely queried individuallyFrequent filtering/sorting
Structure varies per rowComplex queries (joins, aggregations)
-- JSON: Flexible metadata
extraction_metadata JSONB  -- {"fetch_time_ms": 1234, "charset": "utf-8"}

-- Normalized: Structured queryable data
CREATE TABLE agent_findings (
  analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
  agent_type TEXT NOT NULL,
  findings JSONB NOT NULL  -- Hybrid: FK + JSONB
);

Incorrect — Violating 1NF (repeating groups):

-- Multiple values in one column
CREATE TABLE orders (
  id UUID PRIMARY KEY,
  product_ids TEXT  -- '101,102,103' stored as CSV
);

Correct — Junction table (1NF compliant):

-- Atomic values, no repeating groups
CREATE TABLE orders (id UUID PRIMARY KEY);
CREATE TABLE order_items (
  order_id UUID REFERENCES orders(id),
  product_id UUID,
  PRIMARY KEY (order_id, product_id)
);

Design Philosophy

  1. Model the domain, not the UI - Schema reflects business entities
  2. Optimize for reads OR writes - OLTP normalized, OLAP denormalized
  3. Data integrity over performance - Constraints first, optimize later
  4. Plan for scale from day one - Indexing, partitioning, caching strategy

Design NoSQL schemas with embed vs reference trade-offs for query performance — HIGH

NoSQL Schema Design Patterns

SQL vs NoSQL Decision

FactorSQL (PostgreSQL)NoSQL (MongoDB)
Data relationshipsComplex, many-to-manySimple, hierarchical
Query patternsAd-hoc, complex joinsKnown access patterns
ConsistencyStrong (ACID)Eventual (configurable)
SchemaRigid, enforcedFlexible, evolving
ScaleVertical + read replicasHorizontal sharding

Embed vs Reference

Embed When:

  • Data is always accessed together
  • Relationship is 1:1 or 1:few
  • Embedded data rarely changes independently
{
  "_id": "user-123",
  "name": "Alice",
  "address": {
    "street": "123 Main St",
    "city": "Portland",
    "state": "OR"
  }
}

Reference When:

  • Data is accessed independently
  • Relationship is 1:many or many:many
  • Referenced data changes frequently
// users collection
{ "_id": "user-123", "name": "Alice", "org_id": "org-456" }

// organizations collection
{ "_id": "org-456", "name": "Acme Corp", "plan": "enterprise" }

Document Size Limits

DatabaseMax Document SizeRecommendation
MongoDB16 MBKeep under 1 MB for performance
DynamoDB400 KBSplit large items across multiple records
Firestore1 MBUse subcollections for large data

Sharding Strategy

Choose a shard key that ensures even distribution:

// Good: High cardinality, even distribution
db.orders.createIndex({ customer_id: 1, created_at: 1 });
sh.shardCollection("mydb.orders", { customer_id: "hashed" });

// Bad: Low cardinality (hot shard)
sh.shardCollection("mydb.orders", { status: 1 });  // Only 3-5 values!

Schema Validation (MongoDB)

db.createCollection("users", {
  validator: {
    $jsonSchema: {
      bsonType: "object",
      required: ["name", "email", "created_at"],
      properties: {
        name: { bsonType: "string", minLength: 1 },
        email: { bsonType: "string", pattern: "^.+@.+\\..+$" },
        status: { enum: ["active", "inactive", "suspended"] },
        created_at: { bsonType: "date" }
      }
    }
  }
});

Key Decisions

DecisionChoiceRationale
Embed vs referenceBased on access patternsCo-located reads are faster
Sharding keyHigh cardinality, hashedEven data distribution
Consistency levelStart with strong, relax if neededData integrity first
Schema validationAlways define for core collectionsCatches bugs early

Incorrect — Unbounded embedded array:

{
  "_id": "user-123",
  "orders": [
    {"id": "order-1", "total": 100},
    {"id": "order-2", "total": 200}
  ]
}

Correct — Reference pattern for 1:many:

// users collection
{"_id": "user-123", "name": "Alice"}

// orders collection (separate)
{"_id": "order-1", "user_id": "user-123", "total": 100}
{"_id": "order-2", "user_id": "user-123", "total": 200}

Common Mistakes

  • Embedding unbounded arrays (document grows forever)
  • Using sequential shard keys (hot partition)
  • Not defining schema validation (schema chaos)
  • Treating NoSQL as "no schema" (still needs design)

Track schema version history with changelogs and audit trails for rollback safety — HIGH

Versioning Changelog & Audit Trails

Schema Version Table

CREATE TABLE schema_version (
    version_id SERIAL PRIMARY KEY,
    version_number VARCHAR(20) NOT NULL,
    description TEXT NOT NULL,
    applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    applied_by VARCHAR(100),
    execution_time_ms INTEGER,
    checksum VARCHAR(64),
    CONSTRAINT uq_version_number UNIQUE (version_number)
);

Semantic Versioning for Databases

MAJOR.MINOR.PATCH

MAJOR: Breaking changes (drop tables, rename columns)
MINOR: Backward-compatible additions (new tables, nullable columns)
PATCH: Bug fixes, index changes, data migrations

Row-Level Versioning

CREATE TABLE products (
    id UUID PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    version INTEGER NOT NULL DEFAULT 1,
    valid_from TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    valid_to TIMESTAMP WITH TIME ZONE,
    is_current BOOLEAN NOT NULL DEFAULT TRUE,
    created_by VARCHAR(100) NOT NULL,
    updated_by VARCHAR(100)
);

CREATE INDEX idx_products_current ON products (id) WHERE is_current = TRUE;
CREATE INDEX idx_products_temporal ON products (id, valid_from, valid_to);

Change Data Capture (CDC)

CREATE TABLE change_log (
    id BIGSERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    operation VARCHAR(10) NOT NULL,  -- INSERT, UPDATE, DELETE
    record_id UUID NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    changed_by VARCHAR(100),
    transaction_id BIGINT DEFAULT txid_current()
);

CREATE OR REPLACE FUNCTION log_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO change_log (table_name, operation, record_id, new_data, changed_by)
        VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, to_jsonb(NEW), current_user);
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO change_log (table_name, operation, record_id, old_data, new_data, changed_by)
        VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, to_jsonb(OLD), to_jsonb(NEW), current_user);
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO change_log (table_name, operation, record_id, old_data, changed_by)
        VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, to_jsonb(OLD), current_user);
    END IF;
    RETURN COALESCE(NEW, OLD);
END; $$ LANGUAGE plpgsql;

Audit Pattern Decision Matrix

PatternUse CaseComplexity
Row-level versioningSimple history needsLow
Temporal tablesPoint-in-time queriesMedium
CDC (change_log)Full audit complianceHigh

Object Versioning

-- Versioned views
CREATE VIEW orders_summary_v1 AS SELECT order_id, customer_id, total FROM orders;
CREATE VIEW orders_summary_v2 AS SELECT order_id, customer_id, total, shipping_cost FROM orders;
CREATE VIEW orders_summary AS SELECT * FROM orders_summary_v2;  -- Current alias

Incorrect — Mutable audit log:

-- Allows modification/deletion of history
CREATE TABLE audit_log (
  id SERIAL PRIMARY KEY,
  action TEXT,
  changed_at TIMESTAMP
);
-- Missing: Triggers, permissions to prevent changes

Correct — Immutable audit trail:

-- Append-only with JSONB for full history
CREATE TABLE change_log (
  id BIGSERIAL PRIMARY KEY,
  old_data JSONB,
  new_data JSONB,
  changed_at TIMESTAMP DEFAULT NOW()
);
REVOKE DELETE, UPDATE ON change_log FROM app_user;

Best Practices

PracticeReason
Version everythingFull traceability
Immutable historyAudit compliance
Checksum verificationDetect unauthorized changes
Semantic versioningClear impact communication

Detect schema drift between environments using checksum verification and coordination — HIGH

Versioning Drift Detection

Multi-Environment Migration Flow

Local (dev) -> CI (test) -> Staging (preview) -> Production (live)
  alembic       alembic       alembic            alembic
  upgrade       upgrade       upgrade            upgrade
   head          head          head               head

Never skip environments. Always: local -> CI -> staging -> production.

Checksum Verification

def test_migration_checksums(alembic_config):
    """Verify migrations haven't been modified after deployment."""
    script = ScriptDirectory.from_config(alembic_config)

    for revision in script.walk_revisions():
        if revision.revision in DEPLOYED_MIGRATIONS:
            current_checksum = calculate_checksum(revision.path)
            expected_checksum = DEPLOYED_MIGRATIONS[revision.revision]
            assert current_checksum == expected_checksum, \
                f"Migration {revision.revision} was modified after deployment!"

Environment-Specific Settings

# alembic/env.py
import os

def run_migrations_online():
    env = os.getenv("ENVIRONMENT", "development")

    if env == "production":
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            transaction_per_migration=True,
            postgresql_set_session_options={
                "statement_timeout": "30s",
                "lock_timeout": "10s"
            }
        )
    else:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

Migration Locks (Prevent Concurrent Migrations)

"""Migration with advisory lock.

Prevents multiple instances from running migrations simultaneously.
"""
def upgrade():
    # Acquire advisory lock (blocks until available)
    op.execute(text("SELECT pg_advisory_lock(12345)"))

    try:
        op.create_table('new_table', ...)
    finally:
        op.execute(text("SELECT pg_advisory_unlock(12345)"))

Migration Numbering Schemes

Option 1: Sequential       001_initial.sql, 002_add_users.sql
Option 2: Timestamp         20260115120000_initial.sql
Option 3: Hybrid            2026_01_15_001_initial.sql

Recommendation: Timestamp-based (avoids conflicts in parallel development).

Conditional Migration Logic

"""Add analytics index (production only)."""
def upgrade() -> None:
    if os.getenv('ENVIRONMENT') == 'production':
        op.execute("COMMIT")
        op.execute("""
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_timestamp
            ON events (timestamp DESC)
        """)
    else:
        op.create_index('idx_events_timestamp', 'events', ['timestamp'])

Best Practices

PracticeReason
Always test migrations locally firstCatch errors early
Use transaction per migrationAtomic rollback on failure
Set lock timeouts in productionPrevent long-held locks
Never skip environmentsEnsure consistency
Checksum deployed migrationsDetect unauthorized changes
Environment parityConsistent deployments

Incorrect — Skipping environments:

# Dangerous: Deploy to prod without staging test
alembic upgrade head  # On production DB directly

Correct — Progressive deployment:

# Safe: Test in each environment sequentially
alembic upgrade head  # Local
# CI tests pass
alembic upgrade head  # Staging
# Smoke tests pass
alembic upgrade head  # Production

Anti-Patterns

  • Modifying deployed migrations (create new migration instead)
  • Skipping staging and deploying directly to production
  • Running concurrent migrations without advisory locks
  • Versioning sensitive data in migrations (security risk)

Test rollback paths with verified downgrade procedures and documented data loss risks — HIGH

Versioning Rollback Patterns

Full Cycle Testing

# tests/test_migrations.py
import pytest
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory

@pytest.fixture
def alembic_config():
    return Config("alembic.ini")

def test_migrations_upgrade_downgrade(alembic_config, test_db):
    """Test all migrations can be applied and rolled back."""
    command.upgrade(alembic_config, "head")
    command.downgrade(alembic_config, "base")
    assert get_table_count(test_db) == 0

Local Rollback Verification

# Apply migration
alembic upgrade head

# Verify schema change
psql -c "\d tablename"

# Rollback migration
alembic downgrade -1

# Verify rollback complete
psql -c "\d tablename"

# Re-apply to confirm idempotency
alembic upgrade head

Data Integrity Testing

def test_migration_preserves_data(alembic_config, test_db):
    """Verify migration doesn't lose data."""
    insert_test_records(test_db, count=100)
    original_count = get_record_count(test_db)

    command.upgrade(alembic_config, "+1")

    new_count = get_record_count(test_db)
    assert new_count == original_count

Rollback Safety Check

def test_rollback_safety(alembic_config, test_db):
    """Verify rollback restores previous state."""
    command.upgrade(alembic_config, "head")
    pre_rollback_schema = get_schema_snapshot(test_db)

    apply_pending_migration(alembic_config)
    command.downgrade(alembic_config, "-1")

    post_rollback_schema = get_schema_snapshot(test_db)
    assert pre_rollback_schema == post_rollback_schema

Documenting Destructive Rollbacks

"""Split phone_numbers column into user_phones table.

Revision ID: abc123
Revises: xyz456

WARNING: Downgrade will result in data loss. Phone number type
(mobile/home/work) cannot be restored to original format.
"""
def downgrade() -> None:
    # DESTRUCTIVE: Cannot restore original format
    op.add_column('analyses', sa.Column('phone_numbers', sa.Text, nullable=True))
    op.drop_table('user_phones')

CI Integration

# .github/workflows/migrations.yml
migration-test:
  runs-on: ubuntu-latest
  services:
    postgres:
      image: postgres:16
      env:
        POSTGRES_PASSWORD: test
  steps:
    - uses: actions/checkout@v4
    - name: Test migrations
      run: pytest tests/test_migrations.py -v

Anti-Patterns

# NEVER: Skip downgrade implementation
def downgrade():
    pass  # WRONG - implement proper rollback

# NEVER: Modify deployed migrations - create new migration instead

# NEVER: Delete migration history
command.stamp(alembic_config, "head")  # Loses history

Incorrect — Empty downgrade:

def upgrade():
    op.create_table('users', ...)

def downgrade():
    pass  # Rollback won't work!

Correct — Implement downgrade:

def upgrade():
    op.create_table('users', ...)

def downgrade():
    op.drop_table('users')  # Full rollback support

Key Decisions

DecisionRecommendationRationale
Test rollbacksAlways verify locally + CIPrevents production surprises
Destructive rollbacksDocument in migration docstringClear risk communication
Production rollbackalembic downgrade -1 then investigateFast recovery, fix later
Immutable migrationsNever modify after deploymentAudit trail integrity

References (11)

Alembic Advanced

Alembic Advanced Implementation Guide

Advanced patterns for production database migrations with Alembic.

Multi-Database Migrations

Configuration Setup

# alembic/env.py - Multi-database support
from alembic import context
from sqlalchemy import engine_from_config, pool

# Define multiple databases
DATABASES = {
    'default': 'postgresql://user:pass@localhost/main',
    'analytics': 'postgresql://user:pass@localhost/analytics',
    'audit': 'postgresql://user:pass@localhost/audit',
}

def run_migrations_online():
    """Run migrations for each database."""
    for db_name, url in DATABASES.items():
        config = context.config
        config.set_main_option('sqlalchemy.url', url)

        connectable = engine_from_config(
            config.get_section(config.config_ini_section),
            prefix='sqlalchemy.',
            poolclass=pool.NullPool,
        )

        with connectable.connect() as connection:
            context.configure(
                connection=connection,
                target_metadata=get_metadata(db_name),
                version_table=f'alembic_version_{db_name}',
            )

            with context.begin_transaction():
                context.run_migrations()

Per-Database Migrations

# migrations/versions/abc123_add_analytics_table.py
"""Add analytics events table.

Revision ID: abc123
Database: analytics
"""
from alembic import op
import sqlalchemy as sa

revision = 'abc123'
down_revision = 'xyz789'
branch_labels = ('analytics',)  # Database-specific branch

def upgrade() -> None:
    op.create_table('events',
        sa.Column('id', sa.BigInteger, primary_key=True),
        sa.Column('event_type', sa.String(100), nullable=False),
        sa.Column('payload', sa.JSON, nullable=True),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )

def downgrade() -> None:
    op.drop_table('events')

Data Migrations with Batching

Batch Processing Pattern

"""Backfill user_status column with batching.

Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session

BATCH_SIZE = 1000

def upgrade() -> None:
    # Phase 1: Add nullable column
    op.add_column('users', sa.Column('status', sa.String(20), nullable=True))

    # Phase 2: Backfill in batches
    conn = op.get_bind()
    session = Session(bind=conn)

    total_updated = 0
    while True:
        # Fetch batch of IDs without status
        result = conn.execute(sa.text("""
            SELECT id FROM users
            WHERE status IS NULL
            LIMIT :batch_size
            FOR UPDATE SKIP LOCKED
        """), {'batch_size': BATCH_SIZE})

        ids = [row[0] for row in result]
        if not ids:
            break

        # Update batch
        conn.execute(sa.text("""
            UPDATE users
            SET status = CASE
                WHEN is_active THEN 'active'
                ELSE 'inactive'
            END
            WHERE id = ANY(:ids)
        """), {'ids': ids})

        total_updated += len(ids)
        conn.commit()  # Commit per batch to release locks

        print(f"Backfilled {total_updated} rows...")

    # Phase 3: Add NOT NULL constraint (separate migration recommended)

def downgrade() -> None:
    op.drop_column('users', 'status')

Branch Management and Merging

Creating Migration Branches

# Create a feature branch
alembic revision --branch-label=feature_payments -m "start payments feature"

# Create revision on branch
alembic revision --head=feature_payments@head -m "add payment_methods table"

# View branch structure
alembic branches

# Merge branches before deployment
alembic merge feature_payments@head main@head -m "merge payments feature"

Resolving Branch Conflicts

"""Merge feature_payments and main branches.

Revision ID: merge_abc
Revises: ('abc123', 'def456')
"""
revision = 'merge_abc'
down_revision = ('abc123', 'def456')  # Tuple for merge

def upgrade() -> None:
    # No operations - just marks merge point
    pass

def downgrade() -> None:
    # Cannot downgrade past merge - requires specific branch
    raise Exception("Cannot downgrade past merge point")

Online Schema Changes (CONCURRENTLY)

Index Creation Without Locks

"""Add index concurrently on large table.

Revision ID: idx123
"""
from alembic import op

# CRITICAL: Disable transaction for CONCURRENTLY operations
def upgrade() -> None:
    # Exit transaction block
    op.execute("COMMIT")

    # Create index without locking reads/writes
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_customer_date
        ON orders (customer_id, created_at DESC)
        WHERE status != 'cancelled'
    """)

def downgrade() -> None:
    op.execute("COMMIT")
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_orders_customer_date")

Column Rename (Expand-Contract Pattern)

"""Phase 1: Add new column alongside old.

Revision ID: rename_phase1
"""
from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    # Add new column
    op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))

    # Create trigger to sync during transition
    op.execute("""
        CREATE OR REPLACE FUNCTION sync_user_name()
        RETURNS TRIGGER AS $$
        BEGIN
            IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
                NEW.full_name = COALESCE(NEW.full_name, NEW.name);
                NEW.name = COALESCE(NEW.name, NEW.full_name);
            END IF;
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_sync_user_name
        BEFORE INSERT OR UPDATE ON users
        FOR EACH ROW EXECUTE FUNCTION sync_user_name();
    """)

def downgrade() -> None:
    op.execute("DROP TRIGGER IF EXISTS trg_sync_user_name ON users")
    op.execute("DROP FUNCTION IF EXISTS sync_user_name()")
    op.drop_column('users', 'full_name')

Environment-Specific Migrations

Conditional Migration Logic

"""Add analytics index (production only).

Revision ID: prod_idx
"""
from alembic import op
import os

def upgrade() -> None:
    # Only create expensive index in production
    if os.getenv('ENVIRONMENT') == 'production':
        op.execute("COMMIT")
        op.execute("""
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_timestamp
            ON events (timestamp DESC)
        """)
    else:
        # Simpler non-concurrent for dev/test
        op.create_index('idx_events_timestamp', 'events', ['timestamp'])

def downgrade() -> None:
    if os.getenv('ENVIRONMENT') == 'production':
        op.execute("COMMIT")
        op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_events_timestamp")
    else:
        op.drop_index('idx_events_timestamp', table_name='events')

Migration Hooks

Pre/Post Migration Callbacks

# alembic/env.py
from alembic import context

def before_migration(ctx, revision, heads):
    """Run before each migration."""
    print(f"Starting migration: {revision}")
    # Notify monitoring, acquire locks, etc.

def after_migration(ctx, revision, heads):
    """Run after each migration."""
    print(f"Completed migration: {revision}")
    # Clear caches, notify services, etc.

context.configure(
    # ... other config ...
    on_version_apply=after_migration,
)

Audit Trails

Database Audit Trail Patterns

Row-Level Versioning

CREATE TABLE products (
    id UUID PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,

    -- Versioning columns
    version INTEGER NOT NULL DEFAULT 1,
    valid_from TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    valid_to TIMESTAMP WITH TIME ZONE,
    is_current BOOLEAN NOT NULL DEFAULT TRUE,

    -- Audit columns
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    created_by VARCHAR(100) NOT NULL,
    updated_at TIMESTAMP WITH TIME ZONE,
    updated_by VARCHAR(100)
);

-- Index for current records
CREATE INDEX idx_products_current ON products (id) WHERE is_current = TRUE;

-- Index for temporal queries
CREATE INDEX idx_products_temporal ON products (id, valid_from, valid_to);

Temporal Tables (PostgreSQL 15+)

-- Enable temporal tables extension
CREATE EXTENSION IF NOT EXISTS temporal_tables;

-- Create temporal table
CREATE TABLE products (
    id UUID PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    sys_period TSTZRANGE NOT NULL DEFAULT tstzrange(NOW(), NULL)
);

-- History table
CREATE TABLE products_history (LIKE products);

-- Trigger for automatic versioning
CREATE TRIGGER versioning_trigger
BEFORE INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION versioning(
    'sys_period', 'products_history', true
);

-- Query at a point in time
SELECT * FROM products
WHERE sys_period @> '2026-01-15 10:00:00+00'::timestamptz;

Change Data Capture (CDC)

-- CDC table for tracking all changes
CREATE TABLE change_log (
    id BIGSERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    operation VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
    record_id UUID NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    changed_by VARCHAR(100),
    transaction_id BIGINT DEFAULT txid_current()
);

-- Generic trigger function
CREATE OR REPLACE FUNCTION log_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO change_log (table_name, operation, record_id, new_data, changed_by)
        VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO change_log (table_name, operation, record_id, old_data, new_data, changed_by)
        VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, to_jsonb(OLD), to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO change_log (table_name, operation, record_id, old_data, changed_by)
        VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, to_jsonb(OLD), current_user);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- Apply to tables
CREATE TRIGGER products_audit
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION log_changes();

Decision Matrix

PatternUse CaseComplexity
Row-level versioningSimple history needsLow
Temporal tablesPoint-in-time queriesMedium
CDCFull audit complianceHigh

Cost Comparison

Database Cost Comparison

Hosting and operational cost analysis to inform database selection decisions.

Managed Hosting Costs (Approximate Monthly)

ProviderPostgreSQLMongoDBRedis
Free tierSupabase, Neon, RenderAtlas M0 (512 MB)Upstash (10K cmds/day)
Hobby ($5-25)Supabase Pro ($25), Railway ($5+), Render ($7+)Atlas M10 ($57+)Upstash Pay-as-go, Redis Cloud ($5+)
Production ($50-200)Supabase Pro, RDS db.t3.medium ($65), Cloud SQL ($50)Atlas M30 ($230+)ElastiCache t3.small ($25), Memorystore ($35)
Scale ($200-1000)RDS db.r6g.large ($200), Aurora ($250)Atlas M50 ($500+)ElastiCache r6g.large (~$150)

Key takeaway: MongoDB managed hosting costs 2-3x more than PostgreSQL at equivalent tiers. MongoDB Atlas pricing reflects the vendor lock-in premium.

License Considerations

DatabaseLicenseImpact
PostgreSQLPostgreSQL LicenseFully permissive. Host anywhere, modify freely.
MongoDBSSPLCannot offer MongoDB as a managed service. Limits cloud provider hosting options.
RedisRSALv2 + SSPLv1 (since 2024)Source-available but not OSS. Alternatives: Valkey (Linux Foundation fork), KeyDB, DragonflyDB.
SQLitePublic DomainZero restrictions. Embedded in everything.

Operational Complexity

FactorPostgreSQLMongoDBRedisSQLite
Backup/restorepg_dump, pg_basebackup, WAL archivingmongodump, oplogRDB snapshots, AOFFile copy
Monitoringpg_stat_statements, pgBadgerAtlas monitoring, mongotopredis-cli INFO, RedisInsightN/A
Scaling readsRead replicas (simple)Replica sets (moderate)Redis Cluster (moderate)N/A
Scaling writesPartitioning, Citus (moderate)Sharding (complex)Redis Cluster (moderate)N/A
Team expertise neededModerate (widely known)Moderate (less common)Low (simple API)Minimal
Connection poolingPgBouncer (essential at scale)Built-in driver poolingBuilt-inN/A

Total Cost of Ownership Factors

Beyond hosting, consider:

  1. Developer time: PostgreSQL has more tutorials, Stack Overflow answers, and ORM support than any alternative
  2. Hiring: PostgreSQL/SQL skills are universal; MongoDB-specific expertise is niche
  3. Migration cost: Starting with PostgreSQL avoids expensive future migrations
  4. Extension ecosystem: PostGIS, pgvector, TimescaleDB, pg_cron are free — equivalent MongoDB features require paid Atlas tiers
  5. Vendor lock-in: MongoDB Atlas features (Atlas Search, Charts, App Services) don't transfer to self-hosted

Budget Decision Tree

Budget = $0?
  YES --> SQLite (embedded) or Supabase/Neon free tier (managed Postgres)
  NO  -->

Budget < $50/mo?
  YES --> Managed PostgreSQL (Supabase, Railway, Render)
  NO  -->

Budget < $500/mo?
  YES --> PostgreSQL (managed) + Redis (Upstash or small ElastiCache)
  NO  -->

Budget $500+/mo?
  YES --> PostgreSQL (RDS/Aurora/Cloud SQL) + Redis + purpose-built stores as needed

Db Migration Paths

Database Migration Paths

Common migration scenarios, tools, and risk assessment.

Migration Risk Matrix

MigrationDifficultyRiskDowntimeTypical Duration
SQLite to PostgreSQLLowLowMinutes1-2 days dev work
MongoDB to PostgreSQLMedium-HighMediumHours (with planning)1-4 weeks
MySQL to PostgreSQLMediumLow-MediumHours1-2 weeks
PostgreSQL to PostgreSQL (version upgrade)LowLowMinutes (pg_upgrade)Hours
Single PostgreSQL to read replicasLowLowNear-zero1-2 days
Redis cache swap (e.g., to Valkey)LowLowMinutes1 day

SQLite to PostgreSQL

The most common migration path for growing projects.

When: App outgrows single-user/embedded model and needs concurrent writes.

Steps:

  1. Install PostgreSQL and create target database
  2. Use pgloader (recommended) for automated schema + data migration
  3. Update connection string and ORM configuration
  4. Replace SQLite-specific syntax (e.g., AUTOINCREMENT to SERIAL)
  5. Add connection pooling (PgBouncer for production)
  6. Test concurrent write scenarios

Tools: pgloader (handles schema conversion automatically), sqlite3 .dump + manual SQL cleanup.

Common gotchas:

  • SQLite INTEGER PRIMARY KEY is auto-increment; PostgreSQL needs SERIAL or GENERATED ALWAYS AS IDENTITY
  • SQLite has loose typing; PostgreSQL enforces column types strictly
  • Date/time handling differs — SQLite stores as text, PostgreSQL has native types

MongoDB to PostgreSQL

The most impactful migration. Plan carefully.

When: Team realizes relational queries, JOINs, or ACID transactions are needed.

Strategy:

  1. Schema mapping: Map each collection to a table. For truly variable documents, use a typed columns + JSONB hybrid:

    CREATE TABLE products (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        name TEXT NOT NULL,
        category TEXT NOT NULL,
        price NUMERIC(10,2) NOT NULL,
        metadata JSONB  -- Variable fields go here
    );
  2. Data extraction: mongoexport --jsonArray per collection, transform with scripts, load with COPY

  3. Query rewriting: Convert aggregation pipelines to SQL

    • $match becomes WHERE
    • $group becomes GROUP BY
    • $lookup becomes JOIN
    • $unwind becomes LATERAL JOIN or jsonb_array_elements
  4. Dual-write period: Write to both databases during transition, read from PostgreSQL, compare results

  5. Cutover: Switch reads to PostgreSQL, decommission MongoDB

Tools: mongoexport, custom ETL scripts (Python recommended), pgloader (limited MongoDB support).

Risk mitigations:

  • Run dual-write for at least 1 week in production
  • Compare query results between both databases automatically
  • Keep MongoDB running (read-only) for 30 days post-migration as rollback

MySQL to PostgreSQL

When: Need advanced features (JSONB, CTEs, window functions, extensions) or better standards compliance.

Steps:

  1. Use pgloader for automated migration (handles most type conversions)
  2. Review and fix: ENUM types, UNSIGNED integers, AUTO_INCREMENT to SERIAL
  3. Replace MySQL-specific functions (IFNULL to COALESCE, LIMIT x,y to LIMIT y OFFSET x)
  4. Update stored procedures (PL/pgSQL syntax differs from MySQL procedures)

Tools: pgloader (best option), AWS DMS (for RDS-to-RDS), mysqldump + manual conversion.

Adding Read Replicas (PostgreSQL)

When: Read-heavy workload saturates primary, measured (not assumed).

Steps:

  1. Set up streaming replication (primary_conninfo in recovery.conf / postgresql.auto.conf)
  2. Configure application for read/write splitting (write to primary, read from replica)
  3. Handle replication lag in application logic (eventual consistency for reads)

Tools: Built-in streaming replication, Patroni (HA), PgBouncer (connection routing).

Environment Coordination

Environment Coordination Patterns

Multi-Environment Migration Flow

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Local     │───>│     CI      │───>│   Staging   │───>│ Production  │
│   (dev)     │    │   (test)    │    │   (preview) │    │   (live)    │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
       │                  │                  │                  │
       v                  v                  v                  v
   alembic            alembic           alembic            alembic
   upgrade            upgrade           upgrade            upgrade
    head               head              head               head

Environment-Specific Settings

# alembic/env.py
import os

def run_migrations_online():
    env = os.getenv("ENVIRONMENT", "development")

    if env == "production":
        # Use statement timeout for safety
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            transaction_per_migration=True,
            postgresql_set_session_options={
                "statement_timeout": "30s",
                "lock_timeout": "10s"
            }
        )
    else:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

Migration Locks (Prevent Concurrent Migrations)

"""Migration with advisory lock.

Prevents multiple instances from running migrations simultaneously.
"""
from alembic import op
from sqlalchemy import text

def upgrade():
    # Acquire advisory lock (blocks until available)
    op.execute(text("SELECT pg_advisory_lock(12345)"))

    try:
        op.create_table('new_table', ...)
    finally:
        # Release lock
        op.execute(text("SELECT pg_advisory_unlock(12345)"))

Migration Numbering Schemes

Option 1: Sequential
001_initial_schema.sql
002_add_users.sql
003_add_orders.sql

Option 2: Timestamp
20260115120000_initial_schema.sql
20260116143000_add_users.sql
20260117091500_add_orders.sql

Option 3: Hybrid (Date + Sequence)
2026_01_15_001_initial_schema.sql
2026_01_15_002_add_users.sql
2026_01_16_001_add_orders.sql

Best Practices

PracticeReason
Always test migrations locally firstCatch errors early
Use transaction per migrationAtomic rollback on failure
Set lock timeouts in productionPrevent long-held locks
Never skip environmentsEnsure consistency

Migration Patterns

Database Migration Patterns

Overview

Database migrations evolve schema over time while preserving data and minimizing downtime. This guide covers zero-downtime strategies, backfill patterns, rollback planning, and Alembic best practices.


Zero-Downtime Migrations

The Problem

Traditional migrations lock tables during schema changes, causing downtime for users.

The Solution: Multi-Phase Migrations

Phase 1: Add New Column (Nullable)

-- Migration 1: Add column without constraints
ALTER TABLE analyses
ADD COLUMN content_summary TEXT;  -- NULL allowed during transition

Why nullable? Existing rows don't have values yet. Adding NOT NULL would fail.

Phase 2: Backfill Data

-- Migration 2: Populate new column
UPDATE analyses
SET content_summary = LEFT(raw_content, 500)
WHERE content_summary IS NULL
  AND raw_content IS NOT NULL;

-- For large tables, batch updates to avoid long locks
DO $$
DECLARE
    batch_size INTEGER := 1000;
    updated_rows INTEGER;
BEGIN
    LOOP
        UPDATE analyses
        SET content_summary = LEFT(raw_content, 500)
        WHERE id IN (
            SELECT id FROM analyses
            WHERE content_summary IS NULL AND raw_content IS NOT NULL
            LIMIT batch_size
        );

        GET DIAGNOSTICS updated_rows = ROW_COUNT;
        EXIT WHEN updated_rows = 0;

        -- Brief pause to allow other transactions
        PERFORM pg_sleep(0.1);
    END LOOP;
END $$;

Phase 3: Add Constraints

-- Migration 3: Add NOT NULL constraint (safe after backfill)
ALTER TABLE analyses
ALTER COLUMN content_summary SET NOT NULL;

-- Add default for new rows
ALTER TABLE analyses
ALTER COLUMN content_summary SET DEFAULT '';

Real-World Example: OrchestKit's PII Columns

Migration: 20251210_add_pii_columns.py

def upgrade() -> None:
    # Phase 1: Add nullable columns
    op.execute(text("""
        ALTER TABLE analysis_chunks
        ADD COLUMN IF NOT EXISTS pii_flag BOOLEAN DEFAULT FALSE,
        ADD COLUMN IF NOT EXISTS pii_types JSONB
    """))

    # Phase 2: Backfill (all existing chunks have pii_flag = FALSE)
    op.execute(text("""
        UPDATE analysis_chunks
        SET pii_flag = FALSE
        WHERE pii_flag IS NULL
    """))

    # Phase 3: Add NOT NULL constraint
    op.execute(text("""
        ALTER TABLE analysis_chunks
        ALTER COLUMN pii_flag SET NOT NULL
    """))

Backfill Strategies

Small Tables (< 10,000 rows)

Strategy: Single UPDATE statement.

-- Fast enough for small tables
UPDATE artifacts
SET version = 1
WHERE version IS NULL;

Medium Tables (10,000 - 1,000,000 rows)

Strategy: Batched updates with progress tracking.

# Alembic migration
from alembic import op
from sqlalchemy import text

def upgrade() -> None:
    # Add column
    op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))

    # Batched backfill
    connection = op.get_bind()
    batch_size = 1000

    while True:
        result = connection.execute(text("""
            UPDATE analyses
            SET content_summary = LEFT(raw_content, 500)
            WHERE id IN (
                SELECT id FROM analyses
                WHERE content_summary IS NULL AND raw_content IS NOT NULL
                LIMIT :batch_size
            )
        """), {"batch_size": batch_size})

        if result.rowcount == 0:
            break

        print(f"Backfilled {result.rowcount} rows")

Large Tables (> 1,000,000 rows)

Strategy: Background job + application-level backfill.

# Step 1: Add nullable column (migration)
def upgrade() -> None:
    op.add_column('analysis_chunks', sa.Column('content_tsvector', TSVECTOR, nullable=True))

# Step 2: Create trigger for new rows (migration)
def upgrade() -> None:
    op.execute(text("""
        CREATE FUNCTION chunks_tsvector_update() RETURNS TRIGGER AS $$
        BEGIN
            NEW.content_tsvector :=
                setweight(to_tsvector('english', COALESCE(NEW.section_title, '')), 'A') ||
                setweight(to_tsvector('english', COALESCE(NEW.snippet, '')), 'B');
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER chunks_tsvector_trigger
        BEFORE INSERT OR UPDATE OF section_title, snippet ON analysis_chunks
        FOR EACH ROW EXECUTE FUNCTION chunks_tsvector_update();
    """))

# Step 3: Backfill old rows (background script, not migration)
# Run: poetry run python scripts/backfill_chunks_tsvector.py
async def backfill_tsvector():
    batch_size = 5000
    total_backfilled = 0

    while True:
        async with get_db() as db:
            result = await db.execute(text("""
                UPDATE analysis_chunks
                SET content_tsvector =
                    setweight(to_tsvector('english', COALESCE(section_title, '')), 'A') ||
                    setweight(to_tsvector('english', COALESCE(snippet, '')), 'B')
                WHERE id IN (
                    SELECT id FROM analysis_chunks
                    WHERE content_tsvector IS NULL
                    LIMIT :batch_size
                )
            """), {"batch_size": batch_size})

            await db.commit()

            if result.rowcount == 0:
                break

            total_backfilled += result.rowcount
            print(f"Backfilled {total_backfilled} total rows")

OrchestKit Migration: 20251218_backfill_chunks_tsvector.py (actual example)


Rollback Planning

Always Include downgrade()

Every migration MUST have a rollback path.

def upgrade() -> None:
    op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))

def downgrade() -> None:
    op.drop_column('analyses', 'content_summary')

Destructive Rollbacks (Data Loss)

Warning: Some rollbacks cannot preserve data.

def upgrade() -> None:
    # Split 'phone_numbers' TEXT into separate table
    op.create_table('user_phones', ...)
    # Migrate data from analyses.phone_numbers to user_phones
    op.drop_column('analyses', 'phone_numbers')

def downgrade() -> None:
    # DESTRUCTIVE: Cannot restore original 'phone_numbers' format
    op.add_column('analyses', sa.Column('phone_numbers', sa.Text, nullable=True))
    # Drop user_phones table (data loss!)
    op.drop_table('user_phones')

Solution: Document data loss in migration docstring.

"""Split phone_numbers column into user_phones table.

Revision ID: abc123
Revises: xyz456
Create Date: 2025-12-21

WARNING: Downgrade will result in data loss. Phone number type
(mobile/home/work) cannot be restored to original format.
"""

Testing Rollbacks

Always test both upgrade and downgrade paths:

# Apply migration
poetry run alembic upgrade head

# Test rollback
poetry run alembic downgrade -1

# Re-apply
poetry run alembic upgrade head

Alembic Best Practices

Migration File Naming

OrchestKit uses timestamp-based naming for clarity:

20251210_harden_embedding_pipeline.py
20251218_backfill_chunks_tsvector.py

Format: YYYYMMDD_description.py or YYYYMMDDHHmmss_description.py

Use text() for Raw SQL

Required for PostgreSQL-specific features (triggers, constraints, indexes).

from sqlalchemy import text

def upgrade() -> None:
    # WRONG: Will fail with syntax errors
    op.execute("""
        CREATE INDEX idx_vector USING hnsw (vector vector_cosine_ops)
    """)

    # CORRECT: Use text() wrapper
    op.execute(text("""
        CREATE INDEX idx_vector
        ON analysis_chunks
        USING hnsw (vector vector_cosine_ops)
        WITH (m = 16, ef_construction = 64)
    """))

Idempotent Migrations

Make migrations safe to re-run (important for development):

def upgrade() -> None:
    # Add column only if it doesn't exist
    op.execute(text("""
        ALTER TABLE analyses
        ADD COLUMN IF NOT EXISTS content_summary TEXT
    """))

    # Create index only if it doesn't exist
    op.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_analyses_url
        ON analyses(url)
    """))

    # Add constraint with existence check
    op.execute(text("""
        DO $$
        BEGIN
            IF NOT EXISTS (
                SELECT 1 FROM pg_constraint WHERE conname = 'chk_granularity'
            ) THEN
                ALTER TABLE analysis_chunks
                ADD CONSTRAINT chk_granularity
                CHECK (granularity IN ('coarse', 'fine', 'summary'));
            END IF;
        END $$;
    """))

Separate Index Creation

For large tables, create indexes separately (not in transaction):

# Migration creates table without index
poetry run alembic upgrade head

# Manually create index CONCURRENTLY (no locks)
psql -d orchestkit -c "CREATE INDEX CONCURRENTLY idx_chunks_vector_hnsw ON analysis_chunks USING hnsw (vector vector_cosine_ops) WITH (m = 16, ef_construction = 64);"

Why? CREATE INDEX CONCURRENTLY cannot run inside transaction blocks (Alembic uses transactions by default).

Foreign Key Constraints with Cascades

Always specify ON DELETE behavior:

def upgrade() -> None:
    # WRONG: No cascade (orphaned records)
    op.create_foreign_key(
        'fk_artifacts_analysis',
        'artifacts', 'analyses',
        ['analysis_id'], ['id']
    )

    # CORRECT: Cascade deletes
    op.execute(text("""
        ALTER TABLE artifacts
        ADD CONSTRAINT fk_artifacts_analysis
        FOREIGN KEY (analysis_id) REFERENCES analyses(id)
        ON DELETE CASCADE
    """))

OrchestKit Example: 20251210_add_cascade_delete.py

def upgrade() -> None:
    # Drop old constraint
    op.execute(text("""
        ALTER TABLE analysis_chunks
        DROP CONSTRAINT IF EXISTS analysis_chunks_analysis_id_fkey
    """))

    # Add new constraint with CASCADE
    op.execute(text("""
        ALTER TABLE analysis_chunks
        ADD CONSTRAINT analysis_chunks_analysis_id_fkey
        FOREIGN KEY (analysis_id) REFERENCES analyses(id)
        ON DELETE CASCADE
    """))

Trigger Functions

Pattern: Create reusable functions for common triggers.

def upgrade() -> None:
    # Reusable function for updated_at
    op.execute(text("""
        CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$
        BEGIN
            NEW.updated_at = NOW();
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;
    """))

    # Apply to multiple tables
    for table in ['analyses', 'artifacts', 'analysis_chunks']:
        op.execute(text(f"""
            CREATE TRIGGER update_{table}_updated_at
            BEFORE UPDATE ON {table}
            FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
        """))

Migration Dependencies

Handle branching with merge migrations:

# Two developers create migrations in parallel
# branch-a: revision = "abc123", down_revision = "xyz456"
# branch-b: revision = "def789", down_revision = "xyz456"

# Create merge migration
poetry run alembic merge -m "merge sprint 12 migrations" abc123 def789

# Generates:
# revision = "merge123"
# down_revision = ("abc123", "def789")

OrchestKit Example: 500313d1cac9_merge_sprint_12_migrations.py


Common Pitfalls

1. Adding NOT NULL Without Default

# WRONG: Fails if table has existing rows
def upgrade():
    op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=False))

# CORRECT: Add with default or nullable first
def upgrade():
    op.add_column('analyses', sa.Column('content_summary', sa.Text, nullable=True))
    # Backfill data...
    op.alter_column('analyses', 'content_summary', nullable=False)

2. Renaming Columns Without Downtime

# Multi-phase approach:
# Phase 1: Add new column, populate from old column
op.add_column('analyses', sa.Column('url', sa.Text, nullable=True))
op.execute(text("UPDATE analyses SET url = old_url WHERE url IS NULL"))

# Deploy application code that reads/writes both columns

# Phase 2: Drop old column (after code deployment)
op.drop_column('analyses', 'old_url')

3. Changing Column Types

# WRONG: Fails if data is incompatible
op.alter_column('analyses', 'status', type_=sa.Integer)

# CORRECT: Multi-step with validation
op.add_column('analyses', sa.Column('status_new', sa.Integer, nullable=True))
op.execute(text("""
    UPDATE analyses SET status_new =
    CASE status
        WHEN 'pending' THEN 0
        WHEN 'complete' THEN 1
        WHEN 'failed' THEN 2
    END
"""))
op.drop_column('analyses', 'status')
op.alter_column('analyses', 'status_new', new_column_name='status')

Summary

PatternUse WhenExample
Zero-DowntimeProduction systemsAdd nullable → backfill → add constraint
Batched BackfillMedium tables (10K-1M rows)UPDATE in batches with LIMIT
Background BackfillLarge tables (>1M rows)Trigger for new rows + script for old
Idempotent MigrationsDevelopment environmentsIF NOT EXISTS checks
CASCADE DeletesParent-child relationshipsON DELETE CASCADE
TriggersAuto-update computed fieldsupdated_at, search_vector, tsvector
Merge MigrationsParallel developmentCombine divergent branches

Golden Rules:

  1. Always test upgrade() and downgrade() locally
  2. Never add NOT NULL without backfilling first
  3. Use text() for PostgreSQL-specific SQL
  4. Document destructive rollbacks
  5. Create indexes CONCURRENTLY for large tables

Migration Testing

Migration Testing Patterns

Full Cycle Testing

# tests/test_migrations.py
import pytest
from alembic.config import Config
from alembic import command
from alembic.script import ScriptDirectory

@pytest.fixture
def alembic_config():
    return Config("alembic.ini")

def test_migrations_upgrade_downgrade(alembic_config, test_db):
    """Test all migrations can be applied and rolled back."""
    # Get all revisions
    script = ScriptDirectory.from_config(alembic_config)
    revisions = list(script.walk_revisions())

    # Apply all migrations
    command.upgrade(alembic_config, "head")

    # Downgrade all migrations
    command.downgrade(alembic_config, "base")

    # Verify clean state
    assert get_table_count(test_db) == 0

Checksum Verification

def test_migration_checksums(alembic_config):
    """Verify migrations haven't been modified after deployment."""
    script = ScriptDirectory.from_config(alembic_config)

    for revision in script.walk_revisions():
        if revision.revision in DEPLOYED_MIGRATIONS:
            current_checksum = calculate_checksum(revision.path)
            expected_checksum = DEPLOYED_MIGRATIONS[revision.revision]
            assert current_checksum == expected_checksum, \
                f"Migration {revision.revision} was modified after deployment!"

Data Integrity Testing

def test_migration_preserves_data(alembic_config, test_db):
    """Verify migration doesn't lose data."""
    # Insert test data
    insert_test_records(test_db, count=100)
    original_count = get_record_count(test_db)

    # Run migration
    command.upgrade(alembic_config, "+1")

    # Verify data preserved
    new_count = get_record_count(test_db)
    assert new_count == original_count

Rollback Testing

def test_rollback_safety(alembic_config, test_db):
    """Verify rollback restores previous state."""
    # Get initial state
    command.upgrade(alembic_config, "head")
    pre_rollback_schema = get_schema_snapshot(test_db)

    # Apply new migration
    apply_pending_migration(alembic_config)

    # Rollback
    command.downgrade(alembic_config, "-1")

    # Verify schema restored
    post_rollback_schema = get_schema_snapshot(test_db)
    assert pre_rollback_schema == post_rollback_schema

CI Integration

# .github/workflows/migrations.yml
migration-test:
  runs-on: ubuntu-latest
  services:
    postgres:
      image: postgres:16
      env:
        POSTGRES_PASSWORD: test
  steps:
    - uses: actions/checkout@v4
    - name: Test migrations
      run: |
        pytest tests/test_migrations.py -v

Normalization Patterns

Database Normalization Patterns

Overview

Normalization is the process of organizing data to reduce redundancy and improve data integrity. This guide covers normal forms, denormalization strategies, and modern patterns like JSON columns.


Normal Forms (1NF through BCNF)

First Normal Form (1NF)

Rule: Each column contains atomic (indivisible) values, and each row is unique.

Anti-pattern:

-- WRONG: Multiple values in one column
CREATE TABLE users (
    id UUID PRIMARY KEY,
    phone_numbers TEXT  -- "555-1234, 555-5678, 555-9012"
);

Correct:

-- RIGHT: Atomic values only
CREATE TABLE users (
    id UUID PRIMARY KEY,
    name TEXT NOT NULL
);

CREATE TABLE user_phones (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    phone_number TEXT NOT NULL,
    phone_type TEXT CHECK (phone_type IN ('mobile', 'home', 'work'))
);

Second Normal Form (2NF)

Rule: Must be in 1NF + no partial dependencies (all non-key columns depend on the entire primary key).

Anti-pattern:

-- WRONG: order_date depends only on order_id, not on (order_id, product_id)
CREATE TABLE order_items (
    order_id UUID,
    product_id UUID,
    order_date TIMESTAMP,  -- Partial dependency!
    quantity INTEGER,
    PRIMARY KEY (order_id, product_id)
);

Correct:

-- RIGHT: Separate tables for independent entities
CREATE TABLE orders (
    id UUID PRIMARY KEY,
    order_date TIMESTAMP NOT NULL,
    customer_id UUID NOT NULL
);

CREATE TABLE order_items (
    id UUID PRIMARY KEY,
    order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
    product_id UUID NOT NULL REFERENCES products(id),
    quantity INTEGER NOT NULL CHECK (quantity > 0)
);

Third Normal Form (3NF)

Rule: Must be in 2NF + no transitive dependencies (non-key columns depend only on the primary key).

Anti-pattern:

-- WRONG: country_name depends on country_code, not on user_id
CREATE TABLE users (
    id UUID PRIMARY KEY,
    name TEXT NOT NULL,
    country_code TEXT,
    country_name TEXT  -- Transitive dependency!
);

Correct:

-- RIGHT: Extract country data to separate table
CREATE TABLE countries (
    code TEXT PRIMARY KEY,
    name TEXT NOT NULL UNIQUE
);

CREATE TABLE users (
    id UUID PRIMARY KEY,
    name TEXT NOT NULL,
    country_code TEXT REFERENCES countries(code)
);

Boyce-Codd Normal Form (BCNF)

Rule: Must be in 3NF + every determinant is a candidate key.

When to use: Rare edge cases with overlapping candidate keys. Most applications stop at 3NF.


When to Denormalize for Performance

Read-Heavy Workloads

Pattern: Denormalize frequently joined data to reduce query complexity.

-- Normalized (slower reads, cleaner updates)
CREATE TABLE analyses (
    id UUID PRIMARY KEY,
    url TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL
);

CREATE TABLE artifacts (
    id UUID PRIMARY KEY,
    analysis_id UUID REFERENCES analyses(id) ON DELETE CASCADE,
    markdown_content TEXT NOT NULL
);

-- Denormalized (faster reads, requires sync logic)
CREATE TABLE analyses (
    id UUID PRIMARY KEY,
    url TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL,
    artifact_count INTEGER DEFAULT 0  -- Denormalized counter
);

-- Update trigger to maintain artifact_count
CREATE FUNCTION update_artifact_count() RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        UPDATE analyses SET artifact_count = artifact_count + 1
        WHERE id = NEW.analysis_id;
    ELSIF TG_OP = 'DELETE' THEN
        UPDATE analyses SET artifact_count = artifact_count - 1
        WHERE id = OLD.analysis_id;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER artifact_count_trigger
AFTER INSERT OR DELETE ON artifacts
FOR EACH ROW EXECUTE FUNCTION update_artifact_count();

Computed Aggregates

OrchestKit Example: Download counts on artifacts table.

-- Denormalized counter (fast reads, eventual consistency)
CREATE TABLE artifacts (
    id UUID PRIMARY KEY,
    markdown_content TEXT NOT NULL,
    download_count INTEGER DEFAULT 0 NOT NULL
);

-- Increment without locks (eventual consistency)
UPDATE artifacts SET download_count = download_count + 1 WHERE id = :artifact_id;

When NOT to denormalize:

  • Frequent writes to denormalized fields (update overhead)
  • Complex sync logic prone to bugs
  • Data integrity is critical (financial, medical)

JSON Columns vs Normalized Tables

Use JSON Columns When:

  1. Schema is flexible/evolving (e.g., extraction metadata, API responses)
  2. Data is rarely queried individually (stored as opaque blob)
  3. Structure varies per row (e.g., different content types)

OrchestKit Examples:

-- extraction_metadata: Flexible schema, rarely queried
CREATE TABLE analyses (
    id UUID PRIMARY KEY,
    url TEXT NOT NULL,
    extraction_metadata JSONB  -- {"fetch_time_ms": 1234, "charset": "utf-8"}
);

-- artifact_metadata: Tags, topics, complexity (flexible)
CREATE TABLE artifacts (
    id UUID PRIMARY KEY,
    artifact_metadata JSONB  -- {"topics": ["RAG", "LangGraph"], "complexity": "intermediate"}
);

-- Query JSONB with operators
SELECT * FROM artifacts
WHERE artifact_metadata @> '{"topics": ["RAG"]}'::jsonb;

-- Index JSONB for performance
CREATE INDEX idx_artifact_metadata_gin ON artifacts USING GIN (artifact_metadata);

Use Normalized Tables When:

  1. Need foreign key constraints (referential integrity)
  2. Frequent filtering/sorting on individual fields
  3. Complex queries (joins, aggregations)

OrchestKit Example:

-- agent_findings: Structured data with foreign key
CREATE TABLE agent_findings (
    id UUID PRIMARY KEY,
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
    agent_type TEXT NOT NULL,
    findings JSONB NOT NULL,  -- Hybrid: FK + JSONB
    confidence_score FLOAT
);

-- analysis_chunks: Fully normalized for vector search
CREATE TABLE analysis_chunks (
    id UUID PRIMARY KEY,
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,
    section_title TEXT,
    snippet TEXT,
    vector VECTOR(1536),  -- Cannot use JSONB for vector ops
    content_tsvector TSVECTOR  -- Cannot use JSONB for full-text search
);

Indexing Strategies

B-Tree Indexes (Default)

Use for: Equality, range queries, sorting.

-- Single-column indexes
CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);

-- Composite indexes (order matters!)
CREATE INDEX idx_chunks_analysis_granularity
ON analysis_chunks(analysis_id, granularity);

-- Query uses index: WHERE analysis_id = X AND granularity = Y
-- Query uses index (partial): WHERE analysis_id = X
-- Query does NOT use index: WHERE granularity = Y (wrong column order)

Rule of Thumb: Put high-selectivity columns first (columns that filter out the most rows).

GIN Indexes (Inverted Indexes)

Use for: Full-text search (TSVECTOR), JSONB, arrays.

-- Full-text search
CREATE INDEX idx_analyses_search_vector
ON analyses USING GIN(search_vector);

-- JSONB containment queries
CREATE INDEX idx_artifact_metadata_gin
ON artifacts USING GIN(artifact_metadata);

-- Array containment
CREATE INDEX idx_tags_gin
ON articles USING GIN(tags);

HNSW Indexes (Vector Similarity)

Use for: Approximate nearest neighbor search (embeddings).

-- OrchestKit: Semantic search on chunks
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Parameters:
-- m = 16: Connections per layer (higher = more accurate, slower)
-- ef_construction = 64: Build quality (higher = better recall, slower indexing)

Partial Indexes

Use for: Filter frequently queried subsets.

-- Only index completed analyses (common query pattern)
CREATE INDEX idx_analyses_completed
ON analyses(created_at)
WHERE status = 'complete';

-- Only index chunks with content_type (90% have it)
CREATE INDEX idx_chunks_content_type_created
ON analysis_chunks(content_type, created_at DESC)
WHERE content_type IS NOT NULL;

Covering Indexes (Index-Only Scans)

Use for: Include all queried columns in the index.

-- Query: SELECT id, title FROM analyses WHERE status = 'complete' ORDER BY created_at DESC
CREATE INDEX idx_analyses_status_covering
ON analyses(status, created_at DESC)
INCLUDE (id, title);

-- PostgreSQL can satisfy entire query from index (no table access)

Index Maintenance

Monitoring Index Usage

-- Unused indexes (candidates for removal)
SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0
  AND indexname NOT LIKE '%_pkey';

-- Index size
SELECT indexname, pg_size_pretty(pg_relation_size(indexname::regclass))
FROM pg_indexes
WHERE schemaname = 'public';

Reindexing

-- Rebuild index (fixes bloat, updates statistics)
REINDEX INDEX CONCURRENTLY idx_chunks_vector_hnsw;

-- Rebuild all indexes on table
REINDEX TABLE CONCURRENTLY analysis_chunks;

Anti-Patterns to Avoid

Over-Indexing

Problem: Every index slows down writes (INSERT/UPDATE/DELETE).

Rule: Only create indexes for actual query patterns. Remove unused indexes.

JSONB for Everything

Problem: Loss of type safety, no foreign keys, harder to query.

Rule: Use normalized tables for structured, queryable data. JSONB for flexible/opaque data.

Premature Denormalization

Problem: Complexity without proven performance need.

Rule: Start normalized. Denormalize only after profiling shows bottlenecks.

Missing Indexes on Foreign Keys

Problem: Slow joins and cascading deletes.

-- ALWAYS index foreign keys
CREATE TABLE artifacts (
    id UUID PRIMARY KEY,
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE
);

-- REQUIRED for performance
CREATE INDEX idx_artifacts_analysis_id ON artifacts(analysis_id);

Summary

PatternUse WhenExample
3NFDefault for structured datausers, orders, products
DenormalizeRead-heavy, proven bottleneckdownload_count, artifact_count
JSONBFlexible schema, opaque dataextraction_metadata, API responses
Normalized TablesForeign keys, complex queriesanalysis_chunks, agent_findings
B-Tree IndexEquality, ranges, sortingstatus, created_at, foreign keys
GIN IndexFull-text, JSONB, arrayssearch_vector, metadata
HNSW IndexVector similarity (embeddings)vector columns
Partial IndexFrequent filtered queriesWHERE status = 'complete'

Golden Rule: Start normalized (3NF), measure performance, then selectively denormalize based on evidence.

Object Versioning

Database Object Versioning

Stored Procedure Versioning

-- Version tracking for stored procedures
CREATE TABLE procedure_versions (
    id SERIAL PRIMARY KEY,
    procedure_name VARCHAR(200) NOT NULL,
    version VARCHAR(20) NOT NULL,
    definition TEXT NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    created_by VARCHAR(100),
    is_current BOOLEAN DEFAULT TRUE,

    CONSTRAINT uq_procedure_version UNIQUE (procedure_name, version)
);

-- Before updating a procedure, archive it
INSERT INTO procedure_versions (procedure_name, version, definition, created_by)
SELECT
    'calculate_order_total',
    '1.0.0',
    pg_get_functiondef(oid),
    current_user
FROM pg_proc
WHERE proname = 'calculate_order_total';

View Versioning

-- Versioned views with _v1, _v2 naming
CREATE VIEW orders_summary_v1 AS
SELECT order_id, customer_id, total
FROM orders;

CREATE VIEW orders_summary_v2 AS
SELECT order_id, customer_id, total, shipping_cost, tax
FROM orders;

-- Current version alias
CREATE VIEW orders_summary AS
SELECT * FROM orders_summary_v2;

Reference Data Versioning

"""Reference data migration with versioning."""
from alembic import op
from sqlalchemy.sql import table, column
import sqlalchemy as sa

revision = 'ref001'

status_codes = table(
    'status_codes',
    column('code', sa.String),
    column('name', sa.String),
    column('description', sa.String),
    column('version', sa.Integer),
    column('is_active', sa.Boolean)
)

def upgrade():
    # Deactivate old version
    op.execute(
        status_codes.update()
        .where(status_codes.c.version == 1)
        .values(is_active=False)
    )

    # Insert new version
    op.bulk_insert(status_codes, [
        {'code': 'PENDING', 'name': 'Pending', 'version': 2, 'is_active': True},
        {'code': 'PROCESSING', 'name': 'Processing', 'version': 2, 'is_active': True},
        {'code': 'COMPLETED', 'name': 'Completed', 'version': 2, 'is_active': True},
        {'code': 'FAILED', 'name': 'Failed', 'version': 2, 'is_active': True},
    ])

def downgrade():
    op.execute(status_codes.delete().where(status_codes.c.version == 2))
    op.execute(status_codes.update().where(status_codes.c.version == 1).values(is_active=True))

Versioning Strategy

Object TypeStrategyNotes
ProceduresVersion tableArchive before update
Views_v1, _v2 namingAlias for current
Reference dataVersion columnSoft delete old

Postgres Vs Mongodb

PostgreSQL vs MongoDB

Head-to-head comparison for the most common database decision. TL;DR: PostgreSQL wins for almost every use case.

Feature Comparison

FeaturePostgreSQLMongoDB
Data modelRelational + JSONBDocument (BSON)
SchemaEnforced (with flexible JSONB)Schema-less (optional validation)
TransactionsFull ACID, multi-tableMulti-document (since 4.0, slower)
JoinsNative, optimized$lookup (slow, limited)
JSON supportJSONB with GIN indexesNative BSON
Full-text searchBuilt-in tsvectorAtlas Search (paid)
GeospatialPostGIS (industry standard)Built-in (basic)
AggregationSQL (window functions, CTEs)Aggregation pipeline (verbose)
ReplicationStreaming replicationReplica sets
ShardingCitus extension / partitioningBuilt-in (complex to operate)
LicensePostgreSQL License (permissive)SSPL (restrictive)
HostingEvery cloud, many managed optionsAtlas (MongoDB Inc.) or self-host

PostgreSQL JSONB: The MongoDB Killer

PostgreSQL's JSONB type handles document workloads with full SQL power:

-- Store JSON documents
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    data JSONB NOT NULL
);

-- Index nested fields
CREATE INDEX idx_products_category ON products USING GIN ((data->'category'));

-- Query JSON with SQL
SELECT data->>'name' AS name, data->'price' AS price
FROM products
WHERE data @> '{"category": "electronics"}'
  AND (data->>'price')::numeric < 500
ORDER BY (data->>'price')::numeric;

-- Combine relational joins WITH JSON queries
SELECT u.email, p.data->>'name'
FROM users u
JOIN products p ON p.data->>'seller_id' = u.id::text
WHERE p.data @> '{"in_stock": true}';

Key advantages over MongoDB:

  • ACID transactions across relational and JSON data in one query
  • JOIN JSON documents with relational tables
  • GIN indexes on JSONB are fast and flexible
  • Partial indexes on JSON fields for targeted performance
  • No vendor lock-in — SSPL license limits MongoDB hosting options

When MongoDB Actually Makes Sense

These cases are rare but legitimate:

  1. Rapidly evolving schemas: Data shape changes multiple times per week, and you never need cross-document joins. Example: IoT with hundreds of device types each sending different telemetry shapes.

  2. Existing MongoDB codebase: Migration cost exceeds benefit. The team has deep MongoDB operational expertise and the application works well.

  3. Content catalogs with embedded data: Self-contained documents that are always read/written as a whole, never joined. Example: a product catalog where each product document contains all its variants, reviews, and metadata.

  4. Time-series with variable schemas: Each data point has different fields. Note: TimescaleDB (PostgreSQL extension) often handles this better.

Common MongoDB Pitfalls

PitfallWhat HappensPostgreSQL Equivalent
No joinsDenormalize everything, data duplicationNative JOINs
Schema driftDocuments with inconsistent fieldsSchema enforcement
$lookup performanceCross-collection queries are slowOptimized JOIN planner
Transaction overheadMulti-doc transactions are 2-5x slower than single-docNegligible transaction overhead
SSPL licenseCannot offer as managed servicePermissive license
ObjectId orderingTime-based, leaks creation timestampsUUID v7 or SERIAL

Migration: MongoDB to PostgreSQL

See references/migration-paths.md for detailed migration strategies.

Quick summary:

  1. Map collections to tables (or JSONB columns for truly flexible data)
  2. Extract commonly queried fields into typed columns
  3. Use mongodump + transformation scripts + COPY for data migration
  4. Rewrite aggregation pipelines as SQL queries (usually simpler)

Storage And Cms

Storage and CMS Database Selection

Guidance for choosing databases in content management and file storage contexts.

CMS Backend Database Selection

CMS TypeRecommended DatabaseRationale
Traditional CMS (WordPress-like)PostgreSQLStructured content with relationships, taxonomies, user roles
Headless CMS (API-first)PostgreSQLContent types, versioning, localization all benefit from relational model
Blog / DocumentationPostgreSQL or SQLiteSimple schema; SQLite works for single-author static-gen
E-commerce catalogPostgreSQLProducts, variants, inventory, orders — heavily relational
User-generated contentPostgreSQLModeration workflows, reporting, search — needs JOINs and transactions

Why Not MongoDB for CMS?

The "documents for content" intuition is wrong. CMS content is deeply relational:

  • Content references other content (related posts, linked products)
  • Taxonomies (categories, tags) are many-to-many relationships
  • User roles and permissions are relational
  • Content versioning needs transactions
  • Localization multiplies content with locale relationships

PostgreSQL JSONB handles the flexible parts (custom fields, metadata) while maintaining relational integrity for the structural parts.

File and Blob Storage

Rule: Never store large files in a database.

Storage TypeUseTechnology
File metadataDatabase (PostgreSQL)Store filename, size, mime type, S3 key, upload timestamp
Small files (< 1 MB)Database acceptableProfile avatars, thumbnails — BYTEA column or JSONB base64
Medium files (1-100 MB)Object storageS3, GCS, R2, MinIO — store URL/key in database
Large files (100 MB+)Object storage + multipartS3 multipart upload, presigned URLs
Temporary filesObject storage with lifecycleS3 lifecycle rules for auto-deletion
User Upload --> API Server --> Object Storage (S3/R2/GCS)
                    |
                    v
              PostgreSQL (metadata only)
              - file_id, s3_key, filename
              - mime_type, size_bytes
              - uploaded_by, uploaded_at

This pattern:

  • Keeps database small and fast (metadata only)
  • Leverages CDN for file delivery (CloudFront, Cloudflare)
  • Enables presigned URLs for direct upload/download (bypasses API server)
  • Works with any object storage provider (no vendor lock-in)

When to Use Database for Storage

Acceptable cases for storing data directly in PostgreSQL:

  1. Small, frequently accessed blobs: User avatars under 100 KB, stored as BYTEA
  2. Generated documents: PDF invoices, reports — if total volume is small (< 10 GB)
  3. Configuration files: YAML/JSON configs under 1 MB
  4. Embedded SQLite: Single-user apps where adding object storage is overkill

Object Storage Comparison

ProviderFree TierCost (per GB/mo)EgressBest For
Cloudflare R210 GB$0.015FreeCost-sensitive, no egress fees
AWS S35 GB (12 months)$0.023$0.09/GBAWS ecosystem, mature tooling
GCS5 GB$0.020$0.12/GBGCP ecosystem
MinIOUnlimited (self-hosted)Infrastructure costN/AOn-premise, air-gapped
Supabase Storage1 GB$0.021Included in planAlready using Supabase

Default recommendation: Cloudflare R2 for new projects (zero egress fees), S3 for AWS-native stacks.


Checklists (2)

Migration Checklist

Migration Deployment Checklist

Verification steps for safe database migration deployment.

Pre-Deployment Checks

Code Review

  • Migration file has descriptive docstring with purpose
  • revision and down_revision are correct
  • upgrade() contains all necessary changes
  • downgrade() properly reverses all changes (tested!)
  • No hardcoded environment-specific values
  • Large table operations use CONCURRENTLY where applicable

Schema Validation

  • Run alembic check - no pending model changes
  • Generate SQL: alembic upgrade head --sql > migration.sql
  • Review generated SQL for unexpected operations
  • Verify column types match SQLAlchemy model definitions
  • Check constraint names follow naming conventions

Backward Compatibility

  • New columns are nullable=True or have server_default
  • No column/table renames (use expand-contract pattern)
  • No NOT NULL constraints added to existing columns with data
  • Application code works with both old and new schema
  • API responses unchanged (or versioned)

Rollback Testing

Local Rollback Verification

# Apply migration
alembic upgrade head

# Verify schema change
psql -c "\d tablename"

# Rollback migration
alembic downgrade -1

# Verify rollback complete
psql -c "\d tablename"

# Re-apply to confirm idempotency
alembic upgrade head

Rollback Checklist

  • alembic downgrade -1 succeeds without errors
  • Data is preserved after rollback (if applicable)
  • Indexes and constraints are properly removed
  • Triggers and functions are cleaned up
  • Application functions correctly after rollback

Data Backup Verification

Before Production Migration

  • Full database backup completed
  • Backup verified (can restore to test environment)
  • Point-in-time recovery configured (if using RDS/Cloud SQL)
  • Backup retention policy confirmed (minimum 7 days)
  • Document backup timestamp and location

Backup Commands

# PostgreSQL backup
pg_dump -Fc -v -h $DB_HOST -U $DB_USER -d $DB_NAME > backup_$(date +%Y%m%d_%H%M%S).dump

# Verify backup
pg_restore --list backup_*.dump | head -20

# Test restore to separate database
createdb restore_test
pg_restore -d restore_test backup_*.dump

Production Deployment Steps

1. Pre-Flight (T-30 minutes)

  • Notify team of upcoming migration window
  • Verify backup completed successfully
  • Check current migration version: alembic current
  • Review migration history: alembic history -v
  • Confirm rollback plan documented

2. Deployment Execution

# Generate SQL for final review
alembic upgrade head --sql > /tmp/migration_$(date +%Y%m%d).sql

# Review SQL one more time
cat /tmp/migration_$(date +%Y%m%d).sql

# Apply migration with timing
time alembic upgrade head

# Verify new version
alembic current

3. Post-Migration Verification

  • Check alembic current shows expected revision
  • Verify schema changes with \d tablename
  • Run smoke tests against API endpoints
  • Check application logs for database errors
  • Monitor database metrics (connections, query latency)
  • Verify no increase in error rates

4. Rollback Procedure (If Needed)

# Immediate rollback
alembic downgrade -1

# Verify rollback
alembic current

# Notify team of rollback
# Investigate and fix before retry

Large Table Migration Checklist

Additional Checks for Tables > 1M Rows

  • Estimated migration duration calculated
  • CONCURRENTLY used for index operations
  • Batch processing implemented for data migrations
  • Lock wait timeout configured: SET lock_timeout = '5s'
  • Statement timeout configured: SET statement_timeout = '30m'
  • Maintenance window scheduled (if blocking operations)

Monitoring During Migration

  • Active queries: SELECT * FROM pg_stat_activity WHERE state = 'active'
  • Lock monitoring: SELECT * FROM pg_locks WHERE NOT granted
  • Table bloat after migration
  • Replication lag (if applicable)

Emergency Contacts

RoleContactEscalation
DBA On-Call[Slack/Phone]Database issues
Backend Lead[Slack/Phone]Application issues
Infrastructure[Slack/Phone]Connection/network

Post-Deployment Tasks

  • Update documentation if schema changed significantly
  • Close related tickets/issues
  • Schedule VACUUM ANALYZE if large changes
  • Archive migration SQL for audit trail
  • Confirm monitoring alerts are not firing

Schema Design Checklist

Database Schema Design Checklist

Use this checklist when designing or reviewing database schemas.


Pre-Design

  • Requirements Gathered: Understand data entities and relationships
  • Access Patterns Identified: Know how data will be queried
  • SQL vs NoSQL Decision: Chosen appropriate database type
  • Scale Estimate: Expected data volume and growth rate
  • Read/Write Ratio: Understand if read-heavy or write-heavy

Normalization (SQL Databases)

  • 1NF: Atomic values, no repeating groups
  • 2NF: No partial dependencies on composite keys
  • 3NF: No transitive dependencies
  • Denormalization Justified: If denormalized, reason documented

Table Design

Primary Keys

  • Primary Key Defined: Every table has primary key
  • Key Type Chosen: INT auto-increment or UUID
  • Meaningful Keys Avoided: Not using email/username as primary key

Data Types

  • Appropriate Types: Correct data types for each column
  • String Sizes: VARCHAR sized appropriately (not always 255)
  • Numeric Precision: DECIMAL for money, INT for counts
  • Dates in UTC: TIMESTAMP for datetime columns
  • Boolean Type: Using BOOLEAN or TINYINT(1)

Constraints

  • NOT NULL: Required columns marked NOT NULL
  • Unique Constraints: Unique columns (email, username)
  • Check Constraints: Validation rules (price >= 0)
  • Default Values: Sensible defaults where appropriate

Relationships

Foreign Keys

  • Foreign Keys Defined: All relationships have FK constraints
  • ON DELETE Strategy: CASCADE, RESTRICT, SET NULL chosen appropriately
  • ON UPDATE Strategy: Usually CASCADE
  • Indexes on Foreign Keys: FKs are indexed

Relationship Types

  • One-to-Many: Modeled correctly
  • Many-to-Many: Junction table created
  • Self-Referencing: Parent-child relationships handled
  • Polymorphic: Strategy chosen (separate FKs or type+id)

Indexing

Index Strategy

  • Primary Key Indexed: Automatic, verify
  • Foreign Keys Indexed: All FKs have indexes
  • WHERE Columns: Columns in WHERE clauses indexed
  • ORDER BY Columns: Sort columns indexed
  • Composite Indexes: Multi-column queries optimized
  • Column Order: Most selective/queried column first

Index Types

  • B-Tree: Used for ranges and equality (default)
  • Hash: Used for exact matches only (if applicable)
  • Full-Text: Used for text search (if needed)
  • Partial Indexes: Conditional indexes (PostgreSQL)

Index Limits

  • Not Over-Indexed: Only necessary indexes created
  • Index Maintenance: Aware of write performance impact

Performance Considerations

Query Optimization

  • Joins Optimized: N+1 queries avoided
  • SELECT * Avoided: Only fetch needed columns
  • Pagination: LIMIT/OFFSET or cursor-based
  • Aggregations: Pre-calculated for expensive queries

Scalability

  • Sharding Strategy: Planned for large datasets
  • Partitioning: Tables partitioned by date/range (if applicable)
  • Read Replicas: Planned for read-heavy workloads
  • Caching Layer: Application-level caching considered

Data Integrity

Validation

  • Database-Level Validation: Constraints enforce rules
  • Application-Level Validation: Additional checks in code
  • Foreign Key Constraints: Referential integrity enforced
  • Unique Constraints: Duplicate prevention

Consistency

  • Transactions: ACID properties for critical operations
  • Cascading Deletes: Data cleanup strategy
  • Soft Deletes: deleted_at column if needed
  • Audit Trail: created_at, updated_at timestamps

Migrations

Migration Safety

  • Backward Compatible: New columns nullable initially
  • Up and Down Migrations: Rollback scripts provided
  • Data Migrations Separate: Schema vs data changes separated
  • Tested on Staging: Migrations tested on production copy
  • Duration Estimated: Large migrations timed

Zero-Downtime

  • Add Before Remove: New column added before removing old
  • Gradual Rollout: Multi-step migrations for large changes
  • Feature Flags: Large changes behind flags

Security

Access Control

  • Principle of Least Privilege: Users have minimum permissions
  • Separate Accounts: Read-only vs read-write accounts
  • No Root Access: Application doesn't use root/admin account

Data Protection

  • Sensitive Data Encrypted: Passwords hashed, PII encrypted
  • SQL Injection Prevention: Parameterized queries only
  • No Secrets in Code: Credentials in environment variables

NoSQL Considerations (if applicable)

Document Design

  • Embed vs Reference: Strategy chosen based on access patterns
  • Document Size: Within limits (16MB for MongoDB)
  • Schema Validation: Validation rules defined (if supported)
  • Indexes: Appropriate indexes on query fields

Scaling

  • Sharding Key: Chosen for even distribution
  • Replication: Read replicas configured
  • Consistency Level: Chosen appropriately (eventual vs strong)

Documentation

  • ERD Created: Entity-relationship diagram
  • Schema Documented: Column descriptions and purpose
  • Indexes Documented: Why each index exists
  • Relationships Explained: Business logic behind relationships
  • Migration History: Changelog of schema changes

Testing

  • Sample Data: Test data created
  • Query Performance: Slow queries identified (EXPLAIN)
  • Load Testing: Performance under expected load
  • Edge Cases: NULL, empty, max values tested

Common Pitfalls to Avoid

  • ❌ Using VARCHAR(255) for everything
  • ❌ No foreign key constraints
  • ❌ Missing indexes on foreign keys
  • ❌ Over-indexing (index on every column)
  • ❌ FLOAT for money values
  • ❌ Storing dates as strings
  • ❌ No created_at/updated_at timestamps
  • ❌ No soft delete strategy
  • ❌ Non-reversible migrations
  • ❌ Breaking changes without migration plan

Pre-Deployment Checklist

  • Migrations Reviewed: Peer-reviewed by team
  • Rollback Plan: Tested rollback procedures
  • Monitoring Setup: Slow query logging enabled
  • Backups: Backup before major schema changes
  • Alerts Configured: Alerts for query performance degradation

Checklist Version: 1.0.0 Skill: database-schema-designer v1.0.0 Last Updated: 2025-10-31


Examples (2)

Alembic Examples

Alembic Migration Examples

Complete, production-ready migration examples.

Add Column with Default Value

"""Add organization_id to users with default.

Revision ID: add_org_001
Revises: previous_rev
Create Date: 2026-01-15
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

revision = 'add_org_001'
down_revision = 'previous_rev'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add column as nullable first (no lock)
    op.add_column('users',
        sa.Column('organization_id', UUID(as_uuid=True), nullable=True)
    )

    # Add foreign key constraint
    op.create_foreign_key(
        'fk_users_organization',
        'users', 'organizations',
        ['organization_id'], ['id'],
        ondelete='SET NULL'
    )

    # Backfill with default organization (in separate transaction for large tables)
    op.execute("""
        UPDATE users
        SET organization_id = (SELECT id FROM organizations WHERE is_default = true)
        WHERE organization_id IS NULL
    """)

def downgrade() -> None:
    op.drop_constraint('fk_users_organization', 'users', type_='foreignkey')
    op.drop_column('users', 'organization_id')

Create Index CONCURRENTLY

"""Add composite index for user search queries.

Revision ID: idx_user_search
Revises: add_org_001
Create Date: 2026-01-15

Note: Uses CONCURRENTLY to avoid blocking reads/writes.
Migration must run outside transaction.
"""
from alembic import op

revision = 'idx_user_search'
down_revision = 'add_org_001'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # CRITICAL: Exit transaction for CONCURRENTLY
    op.execute("COMMIT")

    # Composite index for common query pattern
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org_status_created
        ON users (organization_id, status, created_at DESC)
        WHERE deleted_at IS NULL
    """)

    # GIN index for full-text search on name
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_name_trgm
        ON users USING gin (name gin_trgm_ops)
    """)

def downgrade() -> None:
    op.execute("COMMIT")
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_name_trgm")
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org_status_created")

Data Migration with Batching

"""Migrate user preferences from JSON to normalized table.

Revision ID: migrate_prefs
Revises: idx_user_search
Create Date: 2026-01-15

Handles millions of rows with batch processing.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
import json

revision = 'migrate_prefs'
down_revision = 'idx_user_search'
branch_labels = None
depends_on = None

BATCH_SIZE = 5000

def upgrade() -> None:
    # Create new preferences table
    op.create_table('user_preferences',
        sa.Column('id', UUID(as_uuid=True), primary_key=True,
                  server_default=sa.text('gen_random_uuid()')),
        sa.Column('user_id', UUID(as_uuid=True), nullable=False),
        sa.Column('key', sa.String(100), nullable=False),
        sa.Column('value', sa.Text, nullable=True),
        sa.Column('created_at', sa.DateTime(timezone=True),
                  server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(timezone=True),
                  server_default=sa.func.now()),
    )

    # Add constraints
    op.create_foreign_key('fk_user_prefs_user', 'user_preferences', 'users',
                          ['user_id'], ['id'], ondelete='CASCADE')
    op.create_unique_constraint('uq_user_prefs_user_key', 'user_preferences',
                                ['user_id', 'key'])

    # Migrate data in batches
    conn = op.get_bind()

    offset = 0
    while True:
        # Fetch batch of users with preferences
        result = conn.execute(sa.text("""
            SELECT id, preferences
            FROM users
            WHERE preferences IS NOT NULL
              AND preferences != '{}'::jsonb
            ORDER BY id
            LIMIT :batch_size OFFSET :offset
        """), {'batch_size': BATCH_SIZE, 'offset': offset})

        rows = result.fetchall()
        if not rows:
            break

        # Transform and insert preferences
        for user_id, prefs in rows:
            if prefs:
                pref_dict = prefs if isinstance(prefs, dict) else json.loads(prefs)
                for key, value in pref_dict.items():
                    conn.execute(sa.text("""
                        INSERT INTO user_preferences (user_id, key, value)
                        VALUES (:user_id, :key, :value)
                        ON CONFLICT (user_id, key) DO NOTHING
                    """), {'user_id': user_id, 'key': key, 'value': str(value)})

        conn.commit()
        offset += BATCH_SIZE
        print(f"Migrated {offset} users...")

    # Create index after data load (faster than during inserts)
    op.create_index('idx_user_prefs_user_id', 'user_preferences', ['user_id'])

def downgrade() -> None:
    # Migrate data back to JSON column
    conn = op.get_bind()

    conn.execute(sa.text("""
        UPDATE users u
        SET preferences = (
            SELECT jsonb_object_agg(key, value)
            FROM user_preferences up
            WHERE up.user_id = u.id
        )
        WHERE EXISTS (
            SELECT 1 FROM user_preferences WHERE user_id = u.id
        )
    """))
    conn.commit()

    op.drop_table('user_preferences')

Enum Type Changes

"""Add new status values to user_status enum.

Revision ID: enum_status
Revises: migrate_prefs
Create Date: 2026-01-15

PostgreSQL enum modification requires special handling.
"""
from alembic import op

revision = 'enum_status'
down_revision = 'migrate_prefs'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add new values to existing enum
    op.execute("ALTER TYPE user_status ADD VALUE IF NOT EXISTS 'suspended'")
    op.execute("ALTER TYPE user_status ADD VALUE IF NOT EXISTS 'pending_verification'")

    # Note: Cannot remove enum values in PostgreSQL
    # For removal, must recreate enum type (see downgrade for pattern)

def downgrade() -> None:
    # Recreate enum without new values (complex operation)
    # First, update any rows using new values
    op.execute("""
        UPDATE users
        SET status = 'inactive'
        WHERE status IN ('suspended', 'pending_verification')
    """)

    # Rename old enum
    op.execute("ALTER TYPE user_status RENAME TO user_status_old")

    # Create new enum without new values
    op.execute("CREATE TYPE user_status AS ENUM ('active', 'inactive', 'deleted')")

    # Update column to use new enum
    op.execute("""
        ALTER TABLE users
        ALTER COLUMN status TYPE user_status
        USING status::text::user_status
    """)

    # Drop old enum
    op.execute("DROP TYPE user_status_old")

Add Table with Partitioning

"""Add partitioned events table for analytics.

Revision ID: events_partition
Revises: enum_status
Create Date: 2026-01-15

Uses PostgreSQL native partitioning for time-series data.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB

revision = 'events_partition'
down_revision = 'enum_status'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Create partitioned parent table
    op.execute("""
        CREATE TABLE events (
            id UUID DEFAULT gen_random_uuid(),
            event_type VARCHAR(100) NOT NULL,
            user_id UUID,
            payload JSONB,
            created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            PRIMARY KEY (id, created_at)
        ) PARTITION BY RANGE (created_at)
    """)

    # Create initial partitions (monthly)
    op.execute("""
        CREATE TABLE events_2026_01 PARTITION OF events
        FOR VALUES FROM ('2026-01-01') TO ('2026-02-01')
    """)
    op.execute("""
        CREATE TABLE events_2026_02 PARTITION OF events
        FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
    """)
    op.execute("""
        CREATE TABLE events_2026_03 PARTITION OF events
        FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
    """)

    # Create indexes on parent (propagates to partitions)
    op.execute("COMMIT")  # For CONCURRENTLY
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_user_type
        ON events (user_id, event_type, created_at DESC)
    """)

    # Add FK constraint
    op.create_foreign_key('fk_events_user', 'events', 'users',
                          ['user_id'], ['id'], ondelete='SET NULL')

def downgrade() -> None:
    op.drop_table('events_2026_03')
    op.drop_table('events_2026_02')
    op.drop_table('events_2026_01')
    op.drop_table('events')

Rename Column Safely (Expand-Contract)

"""Phase 1: Add email_address alongside email.

Revision ID: rename_email_p1
Revises: events_partition
Create Date: 2026-01-15

Safe column rename using expand-contract pattern.
Run Phase 2 after application code is updated.
"""
from alembic import op
import sqlalchemy as sa

revision = 'rename_email_p1'
down_revision = 'events_partition'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Add new column
    op.add_column('users', sa.Column('email_address', sa.String(255), nullable=True))

    # Copy existing data
    op.execute("UPDATE users SET email_address = email")

    # Create sync trigger for transition period
    op.execute("""
        CREATE OR REPLACE FUNCTION sync_user_email()
        RETURNS TRIGGER AS $$
        BEGIN
            IF NEW.email IS DISTINCT FROM OLD.email THEN
                NEW.email_address = NEW.email;
            ELSIF NEW.email_address IS DISTINCT FROM OLD.email_address THEN
                NEW.email = NEW.email_address;
            END IF;
            RETURN NEW;
        END;
        $$ LANGUAGE plpgsql;

        CREATE TRIGGER trg_sync_email
        BEFORE UPDATE ON users
        FOR EACH ROW EXECUTE FUNCTION sync_user_email();
    """)

def downgrade() -> None:
    op.execute("DROP TRIGGER IF EXISTS trg_sync_email ON users")
    op.execute("DROP FUNCTION IF EXISTS sync_user_email()")
    op.drop_column('users', 'email_address')

Orchestkit Database Schema

OrchestKit Database Schema

Overview

OrchestKit uses PostgreSQL with PGVector for storing technical content analysis results, embeddings, and artifacts. The schema is optimized for:

  • Semantic search (vector similarity with HNSW indexing)
  • Full-text search (PostgreSQL tsvector with GIN indexing)
  • Hybrid search (RRF fusion of semantic + keyword results)
  • Content chunking (hierarchical storage of document sections)

Core Tables

analyses - Content Analysis Records

Purpose: Stores URLs being analyzed, their content, embeddings, and processing status.

CREATE TABLE analyses (
    -- Primary key
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),

    -- Required fields
    url TEXT NOT NULL,
    content_type VARCHAR(50) NOT NULL,  -- 'article', 'video', 'repo'
    status VARCHAR(50) NOT NULL DEFAULT 'pending',  -- 'pending', 'processing', 'complete', 'failed'

    -- Optional fields
    title TEXT,
    raw_content TEXT,

    -- Vector embedding (OpenAI text-embedding-3-small: 1536 dimensions)
    content_embedding VECTOR(1536),

    -- Full-text search vector (auto-populated by trigger)
    search_vector TSVECTOR,

    -- Flexible metadata
    extraction_metadata JSONB,  -- {"fetch_time_ms": 1234, "charset": "utf-8"}

    -- Context Engineering (Issue #244 - Handle Pattern)
    content_summary TEXT,  -- LLM-generated summary for lightweight state refs
    content_sections JSONB,  -- {"code_blocks": [...], "headings": [...], "word_count": 5000}

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_analyses_url ON analyses(url);
CREATE INDEX idx_analyses_status ON analyses(status);

-- GIN index for full-text search
CREATE INDEX idx_analyses_search_vector ON analyses USING GIN(search_vector);

-- HNSW index for vector similarity search
CREATE INDEX idx_analyses_embedding_hnsw
ON analyses
USING hnsw (content_embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Partial index for common query (only completed analyses)
CREATE INDEX idx_analyses_completed
ON analyses(created_at DESC)
WHERE status = 'complete';

-- Trigger: Auto-update search_vector on INSERT/UPDATE
CREATE TRIGGER analyses_search_vector_trigger
BEFORE INSERT OR UPDATE OF title, url, raw_content ON analyses
FOR EACH ROW EXECUTE FUNCTION analyses_search_vector_update();

Design Decisions:

  1. UUID Primary Key: Better for distributed systems, avoids sequential ID guessing
  2. JSONB for Metadata: Flexible schema for extraction metadata (fetch time, charset, etc.)
  3. Vector + TSVector: Dual search strategy (semantic + keyword)
  4. Partial Index: 90% of queries filter by status = 'complete'

artifacts - Generated Implementation Guides

Purpose: Stores markdown documents generated from analysis results.

CREATE TABLE artifacts (
    -- Primary key
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),

    -- Foreign key (cascade delete when analysis is deleted)
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,

    -- Required fields
    markdown_content TEXT NOT NULL,
    version INTEGER NOT NULL DEFAULT 1 CHECK (version > 0),

    -- Flexible metadata
    artifact_metadata JSONB,  -- {"topics": ["RAG", "LangGraph"], "complexity": "intermediate"}

    -- Telemetry
    download_count INTEGER NOT NULL DEFAULT 0 CHECK (download_count >= 0),
    trace_id VARCHAR(255),  -- Langfuse trace ID for observability

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_artifacts_analysis_id ON artifacts(analysis_id);
CREATE INDEX idx_artifacts_trace_id ON artifacts(trace_id);

-- GIN index for JSONB queries (e.g., find all artifacts with topic "RAG")
CREATE INDEX idx_artifacts_metadata_gin ON artifacts USING GIN(artifact_metadata);

Design Decisions:

  1. CASCADE Delete: When an analysis is deleted, its artifacts are automatically removed
  2. Version Field: Track artifact revisions (future: regenerate guides)
  3. JSONB Metadata: Topics, tags, complexity stored as flexible JSON
  4. Denormalized Counter: download_count for performance (no JOINs needed)

Query Examples:

-- Find all RAG-related artifacts
SELECT * FROM artifacts
WHERE artifact_metadata @> '{"topics": ["RAG"]}'::jsonb;

-- Get most downloaded artifacts
SELECT id, artifact_metadata->>'complexity', download_count
FROM artifacts
ORDER BY download_count DESC
LIMIT 10;

analysis_chunks - Chunk-Level Embeddings

Purpose: Store individual chunks of analyzed content with embeddings for granular semantic search.

CREATE TABLE analysis_chunks (
    -- Primary key
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),

    -- Foreign key
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,

    -- Chunking metadata
    granularity VARCHAR(20) NOT NULL CHECK (granularity IN ('coarse', 'fine', 'summary')),
    path JSONB NOT NULL,  -- ["section", "subsection", "chunk"]
    section_title TEXT,
    chunk_idx INTEGER NOT NULL CHECK (chunk_idx >= 0),
    chunk_total INTEGER NOT NULL CHECK (chunk_total > 0),

    -- Content metadata
    content_type VARCHAR(50),  -- Denormalized for filtering
    language VARCHAR(20),

    -- Deduplication
    hash VARCHAR(128) NOT NULL,  -- SHA256 hash of normalized content

    -- Embedding metadata
    model VARCHAR(100),  -- 'text-embedding-3-small'
    model_version VARCHAR(50),

    -- Content preview
    snippet TEXT,  -- First ~200 chars

    -- Vector embedding
    vector VECTOR(1536) NOT NULL,

    -- Full-text search vector (auto-populated by trigger)
    content_tsvector TSVECTOR,

    -- Telemetry fields
    token_count INTEGER,
    embedding_latency_ms FLOAT,
    was_truncated BOOLEAN DEFAULT FALSE,

    -- PII metadata (Issue #220)
    pii_flag BOOLEAN NOT NULL DEFAULT FALSE,
    pii_types JSONB,  -- ["email", "phone_us"] (never actual PII values)

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),

    -- Table-level constraints
    CONSTRAINT chk_chunk_idx_lt_total CHECK (chunk_idx < chunk_total)
);

-- Indexes
CREATE INDEX idx_chunks_analysis_id ON analysis_chunks(analysis_id);
CREATE INDEX idx_chunks_hash ON analysis_chunks(hash);

-- Composite indexes
CREATE INDEX idx_chunks_analysis_granularity ON analysis_chunks(analysis_id, granularity);
CREATE INDEX idx_chunks_hash_model ON analysis_chunks(hash, model, model_version);

-- GIN index for full-text search
CREATE INDEX idx_chunks_content_tsvector ON analysis_chunks USING GIN(content_tsvector);

-- HNSW index for vector similarity search
CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Partial index (90% of chunks have content_type)
CREATE INDEX idx_chunks_content_type_created
ON analysis_chunks(content_type, created_at DESC)
WHERE content_type IS NOT NULL;

-- Triggers
CREATE TRIGGER chunks_tsvector_trigger
BEFORE INSERT OR UPDATE OF section_title, snippet ON analysis_chunks
FOR EACH ROW EXECUTE FUNCTION chunks_tsvector_update();

CREATE TRIGGER update_chunks_updated_at
BEFORE UPDATE ON analysis_chunks
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

Design Decisions:

  1. Granularity Levels: coarse (sections), fine (paragraphs), summary (TL;DR)
  2. Hierarchical Path: JSONB array for section navigation (["section", "subsection"])
  3. Hash-Based Deduplication: Avoid re-embedding identical content
  4. Denormalized content_type: Faster filtering without JOIN to analyses
  5. Telemetry Fields: Track token usage, latency for cost analysis
  6. PII Detection: Flag chunks with PII (email, phone) for audit compliance

Query Examples:

-- Semantic search within an analysis
SELECT id, section_title, snippet, 1 - (vector <=> :query_embedding) AS similarity
FROM analysis_chunks
WHERE analysis_id = :analysis_id
ORDER BY vector <=> :query_embedding
LIMIT 10;

-- Full-text search
SELECT id, section_title, ts_rank_cd(content_tsvector, query) AS rank
FROM analysis_chunks, to_tsquery('english', 'machine & learning') AS query
WHERE analysis_id = :analysis_id AND content_tsvector @@ query
ORDER BY rank DESC
LIMIT 10;

-- Hybrid search (RRF fusion) - see chunk_repository.py
-- Combines semantic + keyword results with reciprocal rank fusion

agent_findings - Multi-Agent Analysis Results

Purpose: Store specialized agent outputs (Tech Comparator, Security Auditor, etc.).

CREATE TABLE agent_findings (
    -- Primary key
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),

    -- Foreign key
    analysis_id UUID NOT NULL REFERENCES analyses(id) ON DELETE CASCADE,

    -- Agent metadata
    agent_type VARCHAR(100) NOT NULL,  -- 'tech_comparator', 'security_auditor', etc.

    -- Findings (flexible schema)
    findings JSONB NOT NULL,

    -- Confidence score
    confidence_score FLOAT,

    -- Telemetry
    processing_time_ms INTEGER,

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_agent_findings_analysis_id ON agent_findings(analysis_id);
CREATE INDEX idx_agent_findings_agent_type ON agent_findings(agent_type);

Design Decisions:

  1. JSONB Findings: Each agent has different output structure (flexibility over schema rigidity)
  2. Agent Type Index: Fast filtering by agent type
  3. No Versioning: Findings are immutable (re-run analysis creates new record)

Supporting Tables

annotation_queue - Human-in-the-Loop Feedback

Purpose: Queue chunks/artifacts for human review and feedback collection.

CREATE TABLE annotation_queue (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    artifact_id UUID NOT NULL REFERENCES artifacts(id) ON DELETE CASCADE,
    chunk_id UUID REFERENCES analysis_chunks(id) ON DELETE SET NULL,
    annotation_type VARCHAR(50) NOT NULL,  -- 'quality', 'relevance', 'accuracy'
    status VARCHAR(50) NOT NULL DEFAULT 'pending',
    feedback JSONB,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

agent_memory - Persistent Agent Memory

Purpose: Store agent learnings across sessions (RAG for agent improvements).

CREATE TABLE agent_memory (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    agent_type VARCHAR(100) NOT NULL,
    memory_type VARCHAR(50) NOT NULL,  -- 'success', 'failure', 'pattern'
    content TEXT NOT NULL,
    embedding VECTOR(1536),
    metadata JSONB,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

Embedding Storage Pattern

Why PGVector?

  1. Co-location: Embeddings live alongside metadata (no separate vector DB)
  2. ACID Guarantees: Transactions ensure consistency
  3. Hybrid Search: Native support for keyword + semantic search
  4. Cost-Effective: No additional infrastructure (Redis, Pinecone, Weaviate)

HNSW Index Configuration

CREATE INDEX idx_chunks_vector_hnsw
ON analysis_chunks
USING hnsw (vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

Parameters:

  • m = 16: Connections per layer (16 is good balance of speed/accuracy)
  • ef_construction = 64: Build quality (higher = better recall, slower indexing)

Query-Time Tuning:

-- Increase search quality (default ef_search = 40)
SET hnsw.ef_search = 100;

-- Then run semantic search
SELECT * FROM analysis_chunks
ORDER BY vector <=> :query_embedding
LIMIT 10;

Embedding Model Details

  • Model: OpenAI text-embedding-3-small
  • Dimensions: 1536
  • Distance Metric: Cosine similarity (vector_cosine_ops)
  • Normalization: Vectors are normalized (unit length) before storage

Full-Text Search Pattern

Weighted TSVector

-- Trigger function (analyses table)
CREATE FUNCTION analyses_search_vector_update() RETURNS TRIGGER AS $$
BEGIN
    NEW.search_vector :=
        setweight(to_tsvector('english', COALESCE(NEW.title, '')), 'A') ||  -- Highest weight
        setweight(to_tsvector('english', COALESCE(NEW.url, '')), 'B') ||
        setweight(to_tsvector('english', COALESCE(NEW.raw_content, '')), 'C');
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Weights:

  • A: Most important (title)
  • B: Medium importance (URL)
  • C: Lower importance (content)
  • D: Lowest importance (not used)

Ranking Function

-- ts_rank_cd (cover density) - better for most use cases
SELECT id, ts_rank_cd(search_vector, query) AS rank
FROM analyses, to_tsquery('english', 'machine & learning') AS query
WHERE search_vector @@ query
ORDER BY rank DESC;

Relationship Modeling

Cascade Deletes

All child tables use ON DELETE CASCADE to maintain referential integrity:

analyses (parent)
├── artifacts (ON DELETE CASCADE)
├── analysis_chunks (ON DELETE CASCADE)
└── agent_findings (ON DELETE CASCADE)

artifacts (parent)
└── annotation_queue (ON DELETE CASCADE)

analysis_chunks (parent)
└── annotation_queue (ON DELETE SET NULL)  -- Optional reference

Example: Deleting an Analysis

-- Single DELETE cascades to all children
DELETE FROM analyses WHERE id = :analysis_id;

-- Automatically deletes:
-- - All artifacts for this analysis
-- - All chunks for this analysis
-- - All agent findings for this analysis
-- - All annotation queue entries for these artifacts/chunks

Migration History

Key Migrations

  1. e3c50d69e442_enable_pgvector.py

    • Enable PGVector extension
  2. a37ac3b6a635_initial_schema.py

    • Create analyses, artifacts, agent_findings tables
  3. 20251204091348_add_fulltext_search.py

    • Add search_vector to analyses
    • Create GIN index + trigger for auto-update
    • Add HNSW index for content_embedding
  4. 20251209120000_add_analysis_chunks.py

    • Create analysis_chunks table
  5. 20251210_harden_embedding_pipeline.py

    • Add HNSW index on analysis_chunks.vector
    • Add content_tsvector with GIN index
    • Add telemetry fields (token_count, embedding_latency_ms)
    • Add CHECK constraints (chk_granularity, chk_chunk_idx_lt_total)
    • Add trigger for auto-updating content_tsvector
  6. 20251210_add_cascade_delete.py

    • Update foreign keys to ON DELETE CASCADE
  7. 20251210_add_pii_columns.py

    • Add pii_flag and pii_types to analysis_chunks
  8. 20251218_backfill_chunks_tsvector.py

    • Backfill content_tsvector for existing chunks (batched updates)

Performance Characteristics

Table Sizes (Golden Dataset)

  • Analyses: 98 rows
  • Artifacts: 98 rows
  • Chunks: 415 rows

Index Sizes

  • HNSW Indexes: ~2-3x vector data size
  • GIN Indexes: ~40-60% of indexed text size
  • B-Tree Indexes: ~30-40% of indexed column size

Query Performance

Query TypeLatency (p95)Notes
Semantic search (chunks)<50msHNSW index with m=16
Full-text search (chunks)<30msGIN index on tsvector
Hybrid search (chunks)<100msRRF fusion of both
Analysis by URL<10msB-tree index on url

Best Practices Applied

  1. Indexed All Foreign Keys: Every FK has a B-tree index for join performance
  2. Cascade Deletes: Parent-child relationships use ON DELETE CASCADE
  3. Triggers for Computed Fields: updated_at, search_vector, content_tsvector
  4. Partial Indexes: Filter common query patterns (WHERE status = 'complete')
  5. JSONB for Flexible Data: Metadata, findings, hierarchical paths
  6. Normalized Tables for Structured Data: Chunks, artifacts, findings
  7. Denormalization for Performance: download_count, content_type in chunks
  8. Telemetry Fields: Track token usage, latency, PII detection
  9. CHECK Constraints: Data integrity (granularity, chunk_idx &lt; chunk_total)
  10. UUID Primary Keys: Better for distributed systems, no sequential guessing

Future Enhancements

  1. Materialized Views: Pre-computed aggregates for dashboards
  2. Partitioning: Partition analysis_chunks by analysis_id (when >10M rows)
  3. Read Replicas: Scale read queries across multiple PostgreSQL instances
  4. Connection Pooling: PgBouncer for reduced connection overhead
  5. Archive Old Analyses: Move completed analyses >90 days to cold storage
Edit on GitHub

Last updated on

On this page

Database PatternsQuick ReferenceQuick StartAlembic MigrationsSchema DesignVersioningDatabase SelectionKey DecisionsAnti-Patterns (FORBIDDEN)Detailed DocumentationZero-Downtime MigrationRelated SkillsRules (12)Configure Alembic autogenerate for safe schema migration from model changes — CRITICALAlembic Autogenerate MigrationsAsync env.py ConfigurationGenerate and ApplyMigration TemplateKey DecisionsCommon MistakesHandle Alembic migration branch conflicts with proper merge strategies — CRITICALAlembic Branching & Merge PatternsCreating Feature BranchesMerge MigrationMulti-Database MigrationsColumn Rename (Expand-Contract)Key DecisionsCommon MistakesRun Alembic data migrations safely with batch processing and two-phase approaches — CRITICALAlembic Data Migration PatternsTwo-Phase NOT NULL MigrationBatch Processing PatternConcurrent Index (Zero-Downtime)Running Async Code in MigrationsBackfill Size GuideCommon MistakesSelect the right database engine based on workload requirements and trade-offs — HIGHDatabase Selection GuideThe Default: PostgreSQLDecision Matrix by Project TierDecision Matrix by Data ModelWhen to Use EachAnti-PatternsReferencesPlan migration rollbacks and monitor execution to prevent extended outages — HIGHMigration Rollback and MonitoringMonitoring During MigrationKey DecisionsApply zero-downtime migration patterns to avoid table locks and production outages — CRITICALZero-Downtime Migration PatternsManual Expand PhaseManual Contract PhaseNOT VALID Constraint PatternKey DecisionsDesign database indexes to optimize query performance without slowing writes — HIGHSchema Indexing StrategiesWhen to Create IndexesComposite Index Column OrderIndex TypesB-Tree (Default)GIN (Inverted Index)HNSW (Vector Similarity)Partial IndexesCovering Indexes (Index-Only Scans)Index MaintenanceConstraints as ValidationAnti-PatternsApply normalization rules to eliminate data redundancy and update anomalies — HIGHSchema Normalization PatternsNormal Forms1st Normal Form (1NF)2nd Normal Form (2NF)3rd Normal Form (3NF)When to DenormalizeJSON vs Normalized TablesDesign PhilosophyDesign NoSQL schemas with embed vs reference trade-offs for query performance — HIGHNoSQL Schema Design PatternsSQL vs NoSQL DecisionEmbed vs ReferenceEmbed When:Reference When:Document Size LimitsSharding StrategySchema Validation (MongoDB)Key DecisionsCommon MistakesTrack schema version history with changelogs and audit trails for rollback safety — HIGHVersioning Changelog & Audit TrailsSchema Version TableSemantic Versioning for DatabasesRow-Level VersioningChange Data Capture (CDC)Audit Pattern Decision MatrixObject VersioningBest PracticesDetect schema drift between environments using checksum verification and coordination — HIGHVersioning Drift DetectionMulti-Environment Migration FlowChecksum VerificationEnvironment-Specific SettingsMigration Locks (Prevent Concurrent Migrations)Migration Numbering SchemesConditional Migration LogicBest PracticesAnti-PatternsTest rollback paths with verified downgrade procedures and documented data loss risks — HIGHVersioning Rollback PatternsFull Cycle TestingLocal Rollback VerificationData Integrity TestingRollback Safety CheckDocumenting Destructive RollbacksCI IntegrationAnti-PatternsKey DecisionsReferences (11)Alembic AdvancedAlembic Advanced Implementation GuideMulti-Database MigrationsConfiguration SetupPer-Database MigrationsData Migrations with BatchingBatch Processing PatternBranch Management and MergingCreating Migration BranchesResolving Branch ConflictsOnline Schema Changes (CONCURRENTLY)Index Creation Without LocksColumn Rename (Expand-Contract Pattern)Environment-Specific MigrationsConditional Migration LogicMigration HooksPre/Post Migration CallbacksAudit TrailsDatabase Audit Trail PatternsRow-Level VersioningTemporal Tables (PostgreSQL 15+)Change Data Capture (CDC)Decision MatrixCost ComparisonDatabase Cost ComparisonManaged Hosting Costs (Approximate Monthly)License ConsiderationsOperational ComplexityTotal Cost of Ownership FactorsBudget Decision TreeDb Migration PathsDatabase Migration PathsMigration Risk MatrixSQLite to PostgreSQLMongoDB to PostgreSQLMySQL to PostgreSQLAdding Read Replicas (PostgreSQL)Environment CoordinationEnvironment Coordination PatternsMulti-Environment Migration FlowEnvironment-Specific SettingsMigration Locks (Prevent Concurrent Migrations)Migration Numbering SchemesBest PracticesMigration PatternsDatabase Migration PatternsOverviewZero-Downtime MigrationsThe ProblemThe Solution: Multi-Phase MigrationsPhase 1: Add New Column (Nullable)Phase 2: Backfill DataPhase 3: Add ConstraintsReal-World Example: OrchestKit's PII ColumnsBackfill StrategiesSmall Tables (< 10,000 rows)Medium Tables (10,000 - 1,000,000 rows)Large Tables (> 1,000,000 rows)Rollback PlanningAlways Include downgrade()Destructive Rollbacks (Data Loss)Testing RollbacksAlembic Best PracticesMigration File NamingUse text() for Raw SQLIdempotent MigrationsSeparate Index CreationForeign Key Constraints with CascadesTrigger FunctionsMigration DependenciesCommon Pitfalls1. Adding NOT NULL Without Default2. Renaming Columns Without Downtime3. Changing Column TypesSummaryMigration TestingMigration Testing PatternsFull Cycle TestingChecksum VerificationData Integrity TestingRollback TestingCI IntegrationNormalization PatternsDatabase Normalization PatternsOverviewNormal Forms (1NF through BCNF)First Normal Form (1NF)Second Normal Form (2NF)Third Normal Form (3NF)Boyce-Codd Normal Form (BCNF)When to Denormalize for PerformanceRead-Heavy WorkloadsComputed AggregatesJSON Columns vs Normalized TablesUse JSON Columns When:Use Normalized Tables When:Indexing StrategiesB-Tree Indexes (Default)GIN Indexes (Inverted Indexes)HNSW Indexes (Vector Similarity)Partial IndexesCovering Indexes (Index-Only Scans)Index MaintenanceMonitoring Index UsageReindexingAnti-Patterns to AvoidOver-IndexingJSONB for EverythingPremature DenormalizationMissing Indexes on Foreign KeysSummaryObject VersioningDatabase Object VersioningStored Procedure VersioningView VersioningReference Data VersioningVersioning StrategyPostgres Vs MongodbPostgreSQL vs MongoDBFeature ComparisonPostgreSQL JSONB: The MongoDB KillerWhen MongoDB Actually Makes SenseCommon MongoDB PitfallsMigration: MongoDB to PostgreSQLStorage And CmsStorage and CMS Database SelectionCMS Backend Database SelectionWhy Not MongoDB for CMS?File and Blob StorageRecommended ArchitectureWhen to Use Database for StorageObject Storage ComparisonChecklists (2)Migration ChecklistMigration Deployment ChecklistPre-Deployment ChecksCode ReviewSchema ValidationBackward CompatibilityRollback TestingLocal Rollback VerificationRollback ChecklistData Backup VerificationBefore Production MigrationBackup CommandsProduction Deployment Steps1. Pre-Flight (T-30 minutes)2. Deployment Execution3. Post-Migration Verification4. Rollback Procedure (If Needed)Large Table Migration ChecklistAdditional Checks for Tables > 1M RowsMonitoring During MigrationEmergency ContactsPost-Deployment TasksSchema Design ChecklistDatabase Schema Design ChecklistPre-DesignNormalization (SQL Databases)Table DesignPrimary KeysData TypesConstraintsRelationshipsForeign KeysRelationship TypesIndexingIndex StrategyIndex TypesIndex LimitsPerformance ConsiderationsQuery OptimizationScalabilityData IntegrityValidationConsistencyMigrationsMigration SafetyZero-DowntimeSecurityAccess ControlData ProtectionNoSQL Considerations (if applicable)Document DesignScalingDocumentationTestingCommon Pitfalls to AvoidPre-Deployment ChecklistExamples (2)Alembic ExamplesAlembic Migration ExamplesAdd Column with Default ValueCreate Index CONCURRENTLYData Migration with BatchingEnum Type ChangesAdd Table with PartitioningRename Column Safely (Expand-Contract)Orchestkit Database SchemaOrchestKit Database SchemaOverviewCore Tablesanalyses - Content Analysis Recordsartifacts - Generated Implementation Guidesanalysis_chunks - Chunk-Level Embeddingsagent_findings - Multi-Agent Analysis ResultsSupporting Tablesannotation_queue - Human-in-the-Loop Feedbackagent_memory - Persistent Agent MemoryEmbedding Storage PatternWhy PGVector?HNSW Index ConfigurationEmbedding Model DetailsFull-Text Search PatternWeighted TSVectorRanking FunctionRelationship ModelingCascade DeletesExample: Deleting an AnalysisMigration HistoryKey MigrationsPerformance CharacteristicsTable Sizes (Golden Dataset)Index SizesQuery PerformanceBest Practices AppliedFuture Enhancements