Domain Driven Design
Domain-Driven Design tactical patterns for complex business domains. Use when modeling entities, value objects, domain services, repositories, or establishing bounded contexts.
Primary Agent: backend-system-architect
Domain-Driven Design Tactical Patterns
Model complex business domains with entities, value objects, and bounded contexts.
Overview
- Modeling complex business logic
- Separating domain from infrastructure
- Establishing clear boundaries between subdomains
- Building rich domain models with behavior
- Implementing ubiquitous language in code
Building Blocks Overview
┌─────────────────────────────────────────────────────────────┐
│ DDD Building Blocks │
├─────────────────────────────────────────────────────────────┤
│ ENTITIES VALUE OBJECTS AGGREGATES │
│ Order (has ID) Money (no ID) [Order]→Items │
│ │
│ DOMAIN SERVICES REPOSITORIES DOMAIN EVENTS │
│ PricingService IOrderRepository OrderSubmitted │
│ │
│ FACTORIES SPECIFICATIONS MODULES │
│ OrderFactory OverdueOrderSpec orders/, payments/ │
└─────────────────────────────────────────────────────────────┘Quick Reference
Entity (Has Identity)
from dataclasses import dataclass, field
from uuid import UUID
from uuid_utils import uuid7
@dataclass
class Order:
"""Entity: Has identity, mutable state, lifecycle."""
id: UUID = field(default_factory=uuid7)
customer_id: UUID = field(default=None)
status: str = "draft"
def __eq__(self, other: object) -> bool:
if not isinstance(other, Order):
return NotImplemented
return self.id == other.id # Identity equality
def __hash__(self) -> int:
return hash(self.id)See entities-value-objects.md for complete patterns.
Value Object (Immutable)
from dataclasses import dataclass
from decimal import Decimal
@dataclass(frozen=True) # MUST be frozen!
class Money:
"""Value Object: Defined by attributes, not identity."""
amount: Decimal
currency: str
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)See entities-value-objects.md for Address, DateRange examples.
Key Decisions
| Decision | Recommendation |
|---|---|
| Entity vs VO | Has unique ID + lifecycle? Entity. Otherwise VO |
| Entity equality | By ID, not attributes |
| Value object mutability | Always immutable (frozen=True) |
| Repository scope | One per aggregate root |
| Domain events | Collect in entity, publish after persist |
| Context boundaries | By business capability, not technical |
Rules Quick Reference
| Rule | Impact | What It Covers |
|---|---|---|
| aggregate-boundaries | HIGH | Aggregate root design, reference by ID, one-per-transaction |
| aggregate-invariants | HIGH | Business rule enforcement, specification pattern |
| aggregate-sizing | HIGH | Right-sizing, when to split, eventual consistency |
When NOT to Use
Under 5 entities? Skip DDD entirely. The ceremony costs more than the benefit.
| Pattern | Interview | Hackathon | MVP | Growth | Enterprise | Simpler Alternative |
|---|---|---|---|---|---|---|
| Aggregates | OVERKILL | OVERKILL | OVERKILL | SELECTIVE | APPROPRIATE | Plain dataclasses with validation |
| Bounded contexts | OVERKILL | OVERKILL | OVERKILL | BORDERLINE | APPROPRIATE | Python packages with clear imports |
| CQRS | OVERKILL | OVERKILL | OVERKILL | OVERKILL | WHEN JUSTIFIED | Single model for read/write |
| Value objects | OVERKILL | OVERKILL | BORDERLINE | APPROPRIATE | REQUIRED | Typed fields on the entity |
| Domain events | OVERKILL | OVERKILL | OVERKILL | SELECTIVE | APPROPRIATE | Direct method calls between services |
| Repository pattern | OVERKILL | OVERKILL | BORDERLINE | APPROPRIATE | REQUIRED | Direct ORM queries in service layer |
Rule of thumb: DDD adds ~40% code overhead. Only worth it when domain complexity genuinely demands it (5+ entities with invariants spanning multiple objects). A CRUD app with DDD is a red flag.
Anti-Patterns (FORBIDDEN)
# NEVER have anemic domain models (data-only classes)
@dataclass
class Order:
id: UUID
items: list # WRONG - no behavior!
# NEVER leak infrastructure into domain
class Order:
def save(self, session: Session): # WRONG - knows about DB!
# NEVER use mutable value objects
@dataclass # WRONG - missing frozen=True
class Money:
amount: Decimal
# NEVER have repositories return ORM models
async def get(self, id: UUID) -> OrderModel: # WRONG - return domain!Related Skills
aggregate-patterns- Deep dive on aggregate designork:distributed-systems- Cross-aggregate coordinationork:database-patterns- Schema design for DDD
References
- Entities & Value Objects - Full patterns
- Repositories - Repository pattern implementation
- Domain Events - Event collection and publishing
- Bounded Contexts - Context mapping and ACL
Capability Details
entities
Keywords: entity, identity, lifecycle, mutable, domain object Solves: Model entities in Python, identity equality, adding behavior
value-objects
Keywords: value object, immutable, frozen, dataclass, structural equality Solves: Create immutable value objects, when to use VO vs entity
domain-services
Keywords: domain service, business logic, cross-aggregate, stateless Solves: When to use domain service, logic spanning aggregates
repositories
Keywords: repository, persistence, collection, IRepository, protocol Solves: Implement repository pattern, abstract DB access, ORM mapping
bounded-contexts
Keywords: bounded context, context map, ACL, subdomain, ubiquitous language Solves: Define bounded contexts, integrate with ACL, context relationships
Rules (3)
Define aggregate root boundaries correctly to prevent cross-transaction data corruption — HIGH
Aggregate Root Boundaries and Consistency
Aggregates define transactional consistency boundaries. The root controls all access to children and enforces the one-aggregate-per-transaction rule.
Four Core Rules
- Root controls access — External code only references aggregate root
- Transactional boundary — One aggregate per transaction
- Reference by ID — Never hold object references to other aggregates
- Invariants enforced — Root ensures all business rules before state changes
Correct — Aggregate Root Pattern
from dataclasses import dataclass, field
from uuid import UUID
from uuid_utils import uuid7
@dataclass
class OrderAggregate:
"""Aggregate root — all access goes through here."""
id: UUID = field(default_factory=uuid7)
customer_id: UUID # Reference by ID, not Customer object!
_items: list["OrderItem"] = field(default_factory=list)
status: str = "draft"
@property
def items(self) -> tuple["OrderItem", ...]:
return tuple(self._items) # Expose immutable view
def add_item(self, product_id: UUID, quantity: int, price: "Money") -> None:
self._ensure_modifiable()
if len(self._items) >= self.MAX_ITEMS:
raise DomainError("Max items exceeded")
self._items.append(OrderItem(product_id, quantity, price))Incorrect — Cross-Aggregate References
# NEVER reference aggregates by object
@dataclass
class Order:
customer: Customer # WRONG — holds object reference
# Correct: customer_id: UUID
# NEVER modify multiple aggregates in one transaction
def submit_order(order, inventory):
order.submit()
inventory.reserve(order.items) # WRONG — two aggregates in one tx
# Correct: use domain events for cross-aggregate coordination
# NEVER expose mutable collections
def items(self) -> list:
return self._items # WRONG — caller can mutate
# Correct: return tuple(self._items)Key Rules
- External code accesses children only through the aggregate root
- Cross-aggregate coordination uses domain events, not shared transactions
- Reference other aggregates by ID, never by object
- Expose collections as immutable views (tuple, frozenset)
- One aggregate = one repository = one transaction boundary
Enforce business invariants within aggregates to prevent invalid domain state propagation — HIGH
Enforcing Business Invariants
The aggregate root is responsible for enforcing all business rules before allowing state transitions. Invariants must be checked on every mutation.
Invariant Enforcement Pattern
from dataclasses import dataclass, field
from uuid import UUID
@dataclass
class OrderAggregate:
MAX_ITEMS = 100
id: UUID
_items: list["OrderItem"] = field(default_factory=list)
status: str = "draft"
_events: list["DomainEvent"] = field(default_factory=list)
def add_item(self, product_id: UUID, quantity: int, price: "Money") -> None:
"""Add item with invariant checks."""
self._ensure_modifiable()
if len(self._items) >= self.MAX_ITEMS:
raise DomainError("Max items exceeded")
if quantity <= 0:
raise DomainError("Quantity must be positive")
self._items.append(OrderItem(product_id, quantity, price))
def submit(self) -> None:
"""Submit with business rule validation."""
self._ensure_modifiable()
if not self._items:
raise DomainError("Cannot submit empty order")
self.status = "submitted"
self._events.append(OrderSubmitted(self.id))
def _ensure_modifiable(self) -> None:
if self.status != "draft":
raise DomainError(f"Cannot modify {self.status} order")Specification Pattern for Complex Invariants
from abc import ABC, abstractmethod
class Specification(ABC):
@abstractmethod
def is_satisfied_by(self, candidate) -> bool: ...
def and_(self, other: "Specification") -> "Specification":
return AndSpecification(self, other)
class OverdueOrderSpec(Specification):
def is_satisfied_by(self, order: Order) -> bool:
return (
order.status == "submitted"
and order.created_at < datetime.now() - timedelta(days=30)
)
# Usage
overdue = OverdueOrderSpec()
overdue_orders = [o for o in orders if overdue.is_satisfied_by(o)]Domain Event Collection
@dataclass
class OrderAggregate:
_events: list["DomainEvent"] = field(default_factory=list)
def collect_events(self) -> list["DomainEvent"]:
"""Collect and clear events — publish AFTER persist."""
events = list(self._events)
self._events.clear()
return eventsIncorrect — no invariant checks, allows invalid state:
@dataclass
class OrderAggregate:
id: UUID
_items: list["OrderItem"] = field(default_factory=list)
status: str = "draft"
def add_item(self, product_id: UUID, quantity: int, price: "Money") -> None:
# No checks! Allows negative quantity, submitted order modification
self._items.append(OrderItem(product_id, quantity, price))
def submit(self) -> None:
# No check for empty order!
self.status = "submitted"Correct — enforce invariants on every mutation:
@dataclass
class OrderAggregate:
MAX_ITEMS = 100
id: UUID
_items: list["OrderItem"] = field(default_factory=list)
status: str = "draft"
def add_item(self, product_id: UUID, quantity: int, price: "Money") -> None:
self._ensure_modifiable() # Guard clause
if len(self._items) >= self.MAX_ITEMS:
raise DomainError("Max items exceeded")
if quantity <= 0:
raise DomainError("Quantity must be positive")
self._items.append(OrderItem(product_id, quantity, price))
def submit(self) -> None:
self._ensure_modifiable()
if not self._items:
raise DomainError("Cannot submit empty order")
self.status = "submitted"
def _ensure_modifiable(self) -> None:
if self.status != "draft":
raise DomainError(f"Cannot modify {self.status} order")Key Rules
- Every mutation method checks invariants before modifying state
- Guard clauses at the top of every public method
- Use the specification pattern for complex, reusable business rules
- Collect domain events in the aggregate, publish after successful persistence
- Raise
DomainError(not generic exceptions) for invariant violations - Status transitions follow explicit state machine rules
Right-size aggregates to balance lock contention against consistency guarantee requirements — HIGH
Right-Sizing Aggregates
Keep aggregates small. Large aggregates cause lock contention and slow operations. Split when collections grow unbounded or when different parts change at different rates.
Sizing Guidelines
| Signal | Action |
|---|---|
| < 20 children | Keep as single aggregate |
| 20-100 children | Consider splitting by access pattern |
| 100+ children | Must split — use reference by ID |
| Unbounded collection | Always split — never allow unbounded growth |
| Different change rates | Split into separate aggregates |
Correct — Small, Focused Aggregates
@dataclass
class OrderAggregate:
"""Small aggregate — bounded items list."""
id: UUID
customer_id: UUID # Reference by ID
_items: list["OrderItem"] # Bounded: max 100
MAX_ITEMS = 100
@dataclass
class CustomerAggregate:
"""Separate aggregate — customer has different lifecycle."""
id: UUID
name: str
email: str
# NO orders list here — unbounded!Incorrect — Oversized Aggregate
@dataclass
class CustomerAggregate:
id: UUID
name: str
orders: list["Order"] # WRONG — unbounded growth
reviews: list["Review"] # WRONG — different change rate
notifications: list["Notification"] # WRONG — unrelated concernWhen to Split
- Unbounded collections — If a collection can grow without limit, extract it
- Different change rates — If parts of the aggregate change at different frequencies
- Lock contention — If concurrent modifications frequently conflict
- Performance — If loading the full aggregate is slow
Cross-Aggregate Consistency
After splitting, use eventual consistency between aggregates:
# Order aggregate publishes event
class OrderSubmitted(DomainEvent):
order_id: UUID
customer_id: UUID
# Inventory aggregate handles event (eventually consistent)
class InventoryEventHandler:
async def handle_order_submitted(self, event: OrderSubmitted) -> None:
inventory = await self.repo.get_for_order(event.order_id)
inventory.reserve_items(event.order_id)
await self.repo.save(inventory)Key Rules
- Prefer small aggregates (< 20 children)
- Never allow unbounded collections inside an aggregate
- Use reference by ID for cross-aggregate relationships
- Apply eventual consistency across aggregate boundaries via domain events
- Split by change rate — parts that change together stay together
- Measure lock contention — split if concurrent modifications conflict
References (4)
Bounded Contexts
Bounded Contexts
Context Map
┌─────────────────────────────────────────────────────────────────┐
│ E-Commerce System │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Identity │ │ Catalog │ │ Orders │ │
│ │ Context │ │ Context │ │ Context │ │
│ ├──────────────┤ ├──────────────┤ ├──────────────┤ │
│ │ • User │ │ • Product │ │ • Order │ │
│ │ • Account │ ──── │ • Category │ ──── │ • LineItem │ │
│ │ • Auth │ ACL │ • Price │ ACL │ • Payment │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └─────────────────────┼─────────────────────┘ │
│ │ │
│ ┌──────────────────────┐ │
│ │ Shared Kernel │ │
│ │ • Money VO │ │
│ │ • Email VO │ │
│ │ • Address VO │ │
│ └──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
ACL = Anti-Corruption LayerContext Integration Patterns
| Pattern | Use When | Direction |
|---|---|---|
| Shared Kernel | Teams collaborate closely | Bidirectional |
| Customer-Supplier | Upstream provides, downstream consumes | Unidirectional |
| Conformist | Must follow external model | Unidirectional |
| Anti-Corruption Layer | Protect domain from external changes | Inbound |
| Open Host Service | Provide API for many consumers | Outbound |
| Published Language | Standard schema (e.g., events) | Bidirectional |
Anti-Corruption Layer
# orders/infrastructure/catalog_acl.py
"""Anti-Corruption Layer for Catalog context."""
from dataclasses import dataclass
from uuid import UUID
from orders.domain.value_objects import ProductSnapshot
@dataclass
class CatalogACL:
"""Translate Catalog context to Orders context."""
def __init__(self, catalog_client: "CatalogServiceClient"):
self._catalog = catalog_client
async def get_product_snapshot(self, product_id: UUID) -> ProductSnapshot:
"""Get product data as Orders context value object.
Translates Catalog's Product entity to Orders' ProductSnapshot VO.
This protects Orders from Catalog's internal model changes.
"""
# Call external Catalog service
catalog_product = await self._catalog.get_product(product_id)
# Translate to our domain model
return ProductSnapshot(
product_id=catalog_product.id,
name=catalog_product.name,
price=Money(
amount=catalog_product.current_price,
currency=catalog_product.price_currency,
),
sku=catalog_product.sku,
# Ignore Catalog-specific fields we don't need
# like: catalog_product.category_id, .supplier_id, etc.
)
# orders/domain/value_objects.py
@dataclass(frozen=True)
class ProductSnapshot:
"""Snapshot of product at order time.
Orders context doesn't track product changes - it captures
a snapshot when the order is created.
"""
product_id: UUID
name: str
price: Money
sku: strContext Boundaries in Code
src/
├── identity/ # Identity Bounded Context
│ ├── domain/
│ │ ├── entities/
│ │ │ └── user.py
│ │ ├── value_objects/
│ │ │ └── email.py
│ │ └── repositories/
│ │ └── user_repository.py
│ ├── application/
│ │ └── services/
│ │ └── auth_service.py
│ └── infrastructure/
│ └── repositories/
│ └── sqlalchemy_user_repository.py
│
├── catalog/ # Catalog Bounded Context
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── product.py
│ │ │ └── category.py
│ │ └── value_objects/
│ │ └── price.py
│ └── ...
│
├── orders/ # Orders Bounded Context
│ ├── domain/
│ │ ├── entities/
│ │ │ └── order.py
│ │ └── value_objects/
│ │ └── product_snapshot.py # Local copy, not shared!
│ └── infrastructure/
│ └── acl/
│ ├── catalog_acl.py # Anti-corruption layer
│ └── identity_acl.py
│
└── shared_kernel/ # Shared across contexts
└── value_objects/
├── money.py
└── address.pyCross-Context Communication
# Using domain events for loose coupling
# orders/application/services/order_service.py
class OrderService:
"""Order service publishes events for other contexts."""
async def place_order(self, order: Order) -> None:
order.place()
await self._repo.update(order)
# Publish event - other contexts subscribe
await self._events.publish(OrderPlaced(
order_id=order.id,
customer_id=order.customer_id,
items=[
{"product_id": str(i.product_id), "quantity": i.quantity}
for i in order.items
],
))
# inventory/application/handlers/order_handlers.py
class OrderEventHandler:
"""Inventory context handles order events."""
async def handle_order_placed(self, event: dict) -> None:
"""Reserve inventory when order placed."""
for item in event["items"]:
await self._inventory.reserve(
product_id=UUID(item["product_id"]),
quantity=item["quantity"],
order_id=UUID(event["order_id"]),
)Context Mapping Decisions
# When to use Shared Kernel
# - Both teams own the code
# - Changes require coordination
# - Strong consistency needed
# shared_kernel/value_objects/money.py
@dataclass(frozen=True)
class Money:
"""Shared by Catalog, Orders, Payments contexts."""
amount: Decimal
currency: str
# When to use ACL
# - External system you don't control
# - Legacy system with different model
# - Third-party API
# orders/infrastructure/acl/payment_gateway_acl.py
class PaymentGatewayACL:
"""Translate Stripe API to our Payment model."""
async def charge(self, payment: Payment) -> PaymentResult:
# Call Stripe with their model
stripe_charge = await self._stripe.create_charge(
amount=int(payment.amount.amount * 100), # Stripe uses cents
currency=payment.amount.currency.lower(),
source=payment.stripe_token,
)
# Translate back to our model
return PaymentResult(
success=stripe_charge.status == "succeeded",
transaction_id=stripe_charge.id,
error=stripe_charge.failure_message,
)Domain Events
Domain Events
Event Definition
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import ClassVar
from uuid import UUID
from uuid_utils import uuid7 # UUIDv7 for time-ordered event IDs
@dataclass(frozen=True)
class DomainEvent:
"""Base class for domain events.
Uses UUIDv7 for time-ordered, sortable event IDs.
"""
event_id: UUID = field(default_factory=uuid7)
occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
event_type: ClassVar[str]
def to_dict(self) -> dict:
"""Serialize event for publishing."""
return {
"event_id": str(self.event_id),
"event_type": self.event_type,
"occurred_at": self.occurred_at.isoformat(),
"payload": self._payload(),
}
def _payload(self) -> dict:
"""Override to provide event-specific payload."""
return {}
@dataclass(frozen=True)
class UserCreated(DomainEvent):
"""Emitted when a new user is created."""
event_type: ClassVar[str] = "user.created"
user_id: UUID = field(default_factory=uuid7)
email: str = ""
def _payload(self) -> dict:
return {"user_id": str(self.user_id), "email": self.email}
@dataclass(frozen=True)
class UserActivated(DomainEvent):
"""Emitted when user account is activated."""
event_type: ClassVar[str] = "user.activated"
user_id: UUID = field(default_factory=uuid7)
def _payload(self) -> dict:
return {"user_id": str(self.user_id)}
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
"""Emitted when an order is placed."""
event_type: ClassVar[str] = "order.placed"
order_id: UUID = field(default_factory=uuid7)
customer_id: UUID = field(default_factory=uuid7)
total_amount: str = "0.00"
currency: str = "USD"
def _payload(self) -> dict:
return {
"order_id": str(self.order_id),
"customer_id": str(self.customer_id),
"total": {"amount": self.total_amount, "currency": self.currency},
}Entity Event Collection
@dataclass
class Entity:
"""Base entity with event collection."""
_domain_events: list[DomainEvent] = field(default_factory=list, repr=False)
def add_event(self, event: DomainEvent) -> None:
"""Register domain event for later publishing."""
self._domain_events.append(event)
def collect_events(self) -> list[DomainEvent]:
"""Collect and clear pending events."""
events = self._domain_events.copy()
self._domain_events.clear()
return events
@dataclass
class Order(Entity):
"""Order with domain events."""
id: UUID = field(default_factory=uuid7)
customer_id: UUID = field(default_factory=uuid7)
status: str = "draft"
def place(self) -> None:
"""Place the order."""
if self.status != "draft":
raise ValueError("Can only place draft orders")
self.status = "placed"
self.add_event(OrderPlaced(
order_id=self.id,
customer_id=self.customer_id,
total_amount=str(self.total.amount),
currency=self.total.currency,
))Event Publisher
from abc import abstractmethod
from typing import Protocol
class EventPublisher(Protocol):
"""Protocol for publishing domain events."""
@abstractmethod
async def publish(self, event: DomainEvent) -> None:
"""Publish single event."""
...
@abstractmethod
async def publish_all(self, events: list[DomainEvent]) -> None:
"""Publish multiple events."""
...
class InMemoryEventPublisher(EventPublisher):
"""In-memory publisher for testing."""
def __init__(self):
self.events: list[DomainEvent] = []
async def publish(self, event: DomainEvent) -> None:
self.events.append(event)
async def publish_all(self, events: list[DomainEvent]) -> None:
self.events.extend(events)
class RedisEventPublisher(EventPublisher):
"""Redis Streams event publisher."""
def __init__(self, redis_client, stream_name: str = "domain-events"):
self._redis = redis_client
self._stream = stream_name
async def publish(self, event: DomainEvent) -> None:
await self._redis.xadd(
self._stream,
event.to_dict(),
)
async def publish_all(self, events: list[DomainEvent]) -> None:
async with self._redis.pipeline() as pipe:
for event in events:
pipe.xadd(self._stream, event.to_dict())
await pipe.execute()Service Layer Event Publishing
class OrderService:
"""Application service that publishes domain events."""
def __init__(
self,
order_repo: OrderRepository,
event_publisher: EventPublisher,
):
self._orders = order_repo
self._events = event_publisher
async def place_order(self, order_id: UUID) -> Order:
"""Place order and publish events."""
order = await self._orders.get_or_raise(order_id)
# Business logic (adds events to entity)
order.place()
# Persist changes
await self._orders.update(order)
# Publish collected events
events = order.collect_events()
await self._events.publish_all(events)
return orderEvent Handlers
from typing import Callable, TypeVar
E = TypeVar("E", bound=DomainEvent)
class EventDispatcher:
"""Dispatch events to registered handlers."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = {}
def register(
self,
event_type: str,
handler: Callable[[DomainEvent], None],
) -> None:
"""Register handler for event type."""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def dispatch(self, event: DomainEvent) -> None:
"""Dispatch event to all registered handlers."""
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
await handler(event)
# Handler registration
dispatcher = EventDispatcher()
async def send_welcome_email(event: UserCreated) -> None:
"""Send welcome email on user creation."""
await email_service.send_welcome(event.email)
async def update_analytics(event: UserCreated) -> None:
"""Update analytics on user creation."""
await analytics.track("user_created", {"user_id": str(event.user_id)})
dispatcher.register("user.created", send_welcome_email)
dispatcher.register("user.created", update_analytics)Entities Value Objects
Entities and Value Objects
Entity vs Value Object Decision
| Characteristic | Entity | Value Object |
|---|---|---|
| Identity | Has unique ID | No ID, defined by attributes |
| Equality | By ID | By all attributes |
| Mutability | Mutable (state changes) | Immutable (replace whole) |
| Lifecycle | Tracked over time | Created/discarded |
| Example | User, Order, Product | Email, Money, Address |
Entity Implementation (Python 2026)
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Self
from uuid import UUID
from uuid_utils import uuid7 # pip install uuid-utils (UUIDv7 support)
from app.domain.events import DomainEvent
@dataclass
class Entity:
"""Base entity with identity and domain events.
Uses UUIDv7 for time-ordered, index-friendly IDs.
PostgreSQL 18: Use gen_random_uuid_v7() for DB-generated IDs.
"""
id: UUID = field(default_factory=uuid7)
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
_domain_events: list[DomainEvent] = field(default_factory=list, repr=False)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Entity):
return NotImplemented
return self.id == other.id
def __hash__(self) -> int:
return hash(self.id)
def add_event(self, event: DomainEvent) -> None:
"""Add domain event for later publishing."""
self._domain_events.append(event)
def collect_events(self) -> list[DomainEvent]:
"""Collect and clear domain events."""
events = self._domain_events.copy()
self._domain_events.clear()
return events
@dataclass
class User(Entity):
"""User entity with business logic."""
email: str = ""
name: str = ""
status: str = "pending"
def activate(self) -> Self:
"""Activate user account."""
if self.status == "active":
raise ValueError("User already active")
self.status = "active"
self.updated_at = datetime.now(timezone.utc)
self.add_event(UserActivated(user_id=self.id))
return self
def change_email(self, new_email: str) -> Self:
"""Change user email with validation."""
if not Email.is_valid(new_email):
raise ValueError("Invalid email format")
old_email = self.email
self.email = new_email
self.updated_at = datetime.now(timezone.utc)
self.add_event(UserEmailChanged(
user_id=self.id,
old_email=old_email,
new_email=new_email,
))
return selfValue Object Implementation
from dataclasses import dataclass
from decimal import Decimal
from typing import Self
import re
@dataclass(frozen=True) # Immutable!
class Email:
"""Email value object with validation."""
value: str
def __post_init__(self):
if not self.is_valid(self.value):
raise ValueError(f"Invalid email: {self.value}")
@staticmethod
def is_valid(email: str) -> bool:
pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
return bool(re.match(pattern, email))
def __str__(self) -> str:
return self.value
@dataclass(frozen=True)
class Money:
"""Money value object with currency."""
amount: Decimal
currency: str = "USD"
def __post_init__(self):
# Validate via object.__setattr__ for frozen dataclass
if self.amount < 0:
raise ValueError("Amount cannot be negative")
if len(self.currency) != 3:
raise ValueError("Currency must be 3-letter code")
def add(self, other: Self) -> Self:
"""Add money (same currency only)."""
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)
def multiply(self, factor: int | Decimal) -> Self:
"""Multiply by factor."""
return Money(self.amount * Decimal(factor), self.currency)
def __str__(self) -> str:
return f"{self.currency} {self.amount:.2f}"
@dataclass(frozen=True)
class Address:
"""Address value object."""
street: str
city: str
country: str
postal_code: str
def __post_init__(self):
if not all([self.street, self.city, self.country, self.postal_code]):
raise ValueError("All address fields required")
def format_single_line(self) -> str:
return f"{self.street}, {self.city}, {self.postal_code}, {self.country}"Using Value Objects in Entities
@dataclass
class Order(Entity):
"""Order entity using value objects."""
customer_email: Email = field(default_factory=lambda: Email("default@example.com"))
shipping_address: Address | None = None
total: Money = field(default_factory=lambda: Money(Decimal("0")))
items: list["OrderItem"] = field(default_factory=list)
def add_item(self, product_id: UUID, price: Money, quantity: int) -> Self:
"""Add item and recalculate total."""
item = OrderItem(
product_id=product_id,
price=price,
quantity=quantity,
)
self.items.append(item)
self.total = self._calculate_total()
return self
def _calculate_total(self) -> Money:
"""Calculate order total from items."""
total = Money(Decimal("0"))
for item in self.items:
total = total.add(item.price.multiply(item.quantity))
return totalAnti-Patterns
# WRONG: Value object with ID
@dataclass
class Money:
id: UUID # NO! Value objects have no identity
amount: Decimal
# WRONG: Mutable value object
@dataclass # Missing frozen=True!
class Email:
value: str
# WRONG: Entity equality by attributes
@dataclass
class User:
def __eq__(self, other):
return self.email == other.email # NO! Use ID
# WRONG: Business logic outside entity
def activate_user(user: User) -> None:
user.status = "active" # NO! Put in User.activate()
# WRONG: Using UUIDv4 in 2026
from uuid import uuid4
id: UUID = field(default_factory=uuid4) # NO! Use uuid7 for time-orderingPostgreSQL 18 UUIDv7 Integration
-- PostgreSQL 18 native UUIDv7 generation
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid_v7(),
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMPTZ DEFAULT now()
);
-- UUIDv7 benefits for indexing
-- 1. Time-ordered: sequential inserts, less index fragmentation
-- 2. Sortable: ORDER BY id ≈ ORDER BY created_at
-- 3. Better cache locality: recent records clustered togetherRepositories
Repository Pattern
Repository Protocol (Interface)
from abc import abstractmethod
from typing import Protocol, TypeVar
from uuid import UUID
from app.domain.entities import Entity
T = TypeVar("T", bound=Entity)
class Repository(Protocol[T]):
"""Generic repository protocol for domain entities."""
@abstractmethod
async def get(self, id: UUID) -> T | None:
"""Get entity by ID, returns None if not found."""
...
@abstractmethod
async def get_or_raise(self, id: UUID) -> T:
"""Get entity by ID, raises if not found."""
...
@abstractmethod
async def add(self, entity: T) -> T:
"""Add new entity to repository."""
...
@abstractmethod
async def update(self, entity: T) -> T:
"""Update existing entity."""
...
@abstractmethod
async def delete(self, id: UUID) -> None:
"""Delete entity by ID."""
...
class UserRepository(Protocol):
"""User-specific repository with domain queries."""
async def get(self, id: UUID) -> "User | None": ...
async def get_or_raise(self, id: UUID) -> "User": ...
async def add(self, user: "User") -> "User": ...
async def update(self, user: "User") -> "User": ...
async def delete(self, id: UUID) -> None: ...
# Domain-specific queries
async def find_by_email(self, email: str) -> "User | None": ...
async def find_active_users(self, limit: int = 100) -> list["User"]: ...
async def exists_by_email(self, email: str) -> bool: ...SQLAlchemy Implementation
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.domain.entities import User
from app.domain.repositories import UserRepository
from app.infrastructure.models import UserModel
class SQLAlchemyUserRepository(UserRepository):
"""SQLAlchemy implementation of UserRepository."""
def __init__(self, session: AsyncSession):
self._session = session
async def get(self, id: UUID) -> User | None:
result = await self._session.get(UserModel, id)
return self._to_entity(result) if result else None
async def get_or_raise(self, id: UUID) -> User:
user = await self.get(id)
if not user:
raise UserNotFoundError(f"User {id} not found")
return user
async def add(self, user: User) -> User:
model = self._to_model(user)
self._session.add(model)
await self._session.flush()
return user
async def update(self, user: User) -> User:
model = await self._session.get(UserModel, user.id)
if not model:
raise UserNotFoundError(f"User {user.id} not found")
# Update model from entity
model.email = user.email
model.name = user.name
model.status = user.status
model.updated_at = user.updated_at
await self._session.flush()
return user
async def delete(self, id: UUID) -> None:
model = await self._session.get(UserModel, id)
if model:
await self._session.delete(model)
await self._session.flush()
async def find_by_email(self, email: str) -> User | None:
stmt = select(UserModel).where(UserModel.email == email)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()
return self._to_entity(model) if model else None
async def find_active_users(self, limit: int = 100) -> list[User]:
stmt = (
select(UserModel)
.where(UserModel.status == "active")
.limit(limit)
)
result = await self._session.execute(stmt)
return [self._to_entity(m) for m in result.scalars()]
async def exists_by_email(self, email: str) -> bool:
stmt = select(UserModel.id).where(UserModel.email == email).limit(1)
result = await self._session.execute(stmt)
return result.scalar_one_or_none() is not None
def _to_entity(self, model: UserModel) -> User:
"""Map database model to domain entity."""
return User(
id=model.id,
email=model.email,
name=model.name,
status=model.status,
created_at=model.created_at,
updated_at=model.updated_at,
)
def _to_model(self, entity: User) -> UserModel:
"""Map domain entity to database model."""
return UserModel(
id=entity.id,
email=entity.email,
name=entity.name,
status=entity.status,
created_at=entity.created_at,
updated_at=entity.updated_at,
)Unit of Work Pattern
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
class UnitOfWork:
"""Coordinates repositories and transaction management."""
def __init__(self, session: AsyncSession):
self._session = session
self.users = SQLAlchemyUserRepository(session)
self.orders = SQLAlchemyOrderRepository(session)
async def commit(self) -> None:
"""Commit transaction."""
await self._session.commit()
async def rollback(self) -> None:
"""Rollback transaction."""
await self._session.rollback()
@asynccontextmanager
async def unit_of_work(
session_factory,
) -> AsyncGenerator[UnitOfWork, None]:
"""Create unit of work context."""
async with session_factory() as session:
uow = UnitOfWork(session)
try:
yield uow
await uow.commit()
except Exception:
await uow.rollback()
raiseRepository Best Practices
# GOOD: Repository returns domain entities
async def get(self, id: UUID) -> User | None:
model = await self._session.get(UserModel, id)
return self._to_entity(model) if model else None
# BAD: Repository returns ORM models
async def get(self, id: UUID) -> UserModel | None: # Leaks infrastructure!
return await self._session.get(UserModel, id)
# GOOD: Domain-specific queries
async def find_eligible_for_discount(self) -> list[User]:
"""Find users eligible for loyalty discount."""
...
# BAD: Generic SQL queries in repository
async def find_by_query(self, query: str) -> list[User]: # Too generic!
...
# GOOD: Repository handles mapping
def _to_entity(self, model: UserModel) -> User:
return User(...)
# BAD: Caller handles mapping
user_dict = await repo.get_raw(id) # Returns dict, caller mapsChecklists (1)
Ddd Checklist
Domain-Driven Design Checklist
Strategic Design
Bounded Contexts
- Domain boundaries identified and documented
- Context map shows relationships (ACL, Shared Kernel, etc.)
- Each context has clear ownership
- Ubiquitous language defined per context
- Integration patterns chosen (events, API, shared DB)
Ubiquitous Language
- Domain terms documented in glossary
- Code uses domain terminology (not technical jargon)
- Team (dev + domain experts) agrees on terms
- Terms are context-specific (not global)
Tactical Design
Entities
- Identified by unique ID (prefer UUIDv7)
- Equality based on ID, not attributes
- Contains business logic (not anemic)
- State changes through methods, not setters
- Domain events emitted for significant changes
- PostgreSQL 18: Using
gen_random_uuid_v7()for IDs
Value Objects
- Immutable (
frozen=Truein dataclass) - Equality based on all attributes
- Self-validating in
__post_init__ - No identity (no ID field)
- Operations return new instances
Aggregates
- Aggregate root identified
- All access through aggregate root
- Invariants enforced within aggregate
- References to other aggregates by ID only
- Sized appropriately (not too large)
Repositories
- Interface defined in domain layer (Protocol)
- Implementation in infrastructure layer
- Returns domain entities (not ORM models)
- Domain-specific query methods
- Unit of Work for transaction management
Domain Events
- Events are immutable (frozen dataclass)
- Events named in past tense (OrderPlaced, not PlaceOrder)
- Events contain IDs, not full entities
- Collection on entity, publishing in service layer
- UUIDv7 for time-ordered event IDs
Domain Services
- Used for cross-entity operations
- Stateless
- Named with domain verbs (not technical)
- Coordinates entities, doesn't replace their logic
Layer Architecture
Domain Layer
- No infrastructure dependencies
- Entities, value objects, domain events
- Repository interfaces (Protocols)
- Domain services
Application Layer
- Use cases / application services
- Transaction management (Unit of Work)
- Event publishing
- DTO mapping
Infrastructure Layer
- Repository implementations
- ORM models and mapping
- External service clients
- Anti-corruption layers
Presentation Layer
- API routes/controllers
- Input validation (Pydantic)
- Response formatting
- No business logic
Code Quality
Naming
- Classes named with domain terms
- Methods use domain verbs
- No technical jargon in domain layer
- Consistent with ubiquitous language
Testing
- Domain logic unit tested
- Repository implementations tested
- Application services integration tested
- Domain events verified
Anti-Patterns Avoided
- No anemic domain model
- No business logic in controllers
- No ORM models in domain layer
- No circular dependencies between contexts
- No UUIDv4 (use UUIDv7 for time-ordering)
PostgreSQL 18 Specifics
UUIDv7 Usage
-
gen_random_uuid_v7()as column default - Python:
uuid_utils.uuid7()for app-generated IDs - Index on ID serves as ~created_at index
- No separate created_at index needed for sorting
Performance
- UUIDv7 reduces index fragmentation
- Recent records clustered together
- Better cache locality for recent queries
Doctor
OrchestKit doctor for health diagnostics. Use when running checks on plugin health, diagnosing problems, or troubleshooting issues.
Errors
Error pattern analysis and troubleshooting for Claude Code sessions. Use when handling errors, fixing failures, troubleshooting issues.
Last updated on