Book3 - Architecture Patterns with Python
By Harry Percival & Bob Gregory (O'Reilly, 2020)
Subtitle: Enabling Test-Driven Development, Domain-Driven Design, and Event-Driven Microservices
Core thesis: As software grows in complexity, the way we structure our code matters more than the way we write individual functions. This book applies DDD, TDD, and event-driven patterns in Python to build systems that are testable, maintainable, and scalable.
Introduction
Why Do Our Designs Go Wrong?
- Big ball of mud: the natural state of software when we don't actively manage complexity
- Symptoms: hard to change, hard to test, side-effect-laden code
- The cure: encapsulation (simplifying behavior & hiding data) and abstractions (simplified interfaces hiding complex details)
- Layering: each layer depends only on the layer below it
Encapsulation and Abstractions
- Encapsulation: simplifying behavior and hiding data behind a well-defined interface
- Abstraction: a simplified description of a system that emphasizes some details while ignoring others
- In traditional layered architecture, the problem is tight coupling to the database ("everything depends on the DB")
The Dependency Inversion Principle
- DIP: High-level modules should not depend on low-level modules. Both should depend on abstractions.
- Instead of
BusinessLogic -> Database, use BusinessLogic -> AbstractRepository <- ConcreteRepository
- The domain model should be the most important layer and should have no dependencies on infrastructure
Part I: Building an Architecture to Support Domain Modeling
Chapter 1: Domain Modeling
What Is a Domain Model?
- Domain model: the mental map that business owners carry of the business processes they manage
- The software should mirror this mental map as closely as possible
- The book's example domain: a furniture allocation service (assigning incoming orders to batches of stock)
Exploring the Domain Language
- Key terms from the example domain:
- SKU (Stock Keeping Unit): a unique product type (e.g.,
RED-CHAIR)
- Batch: a quantity of a SKU arriving on a specific date
- Order Line: a single line on a customer's order (SKU + quantity)
- Allocate: link an order line to a batch, reducing available stock
Unit Testing Domain Models
# Domain model tests are pure Python - no DB, no frameworks
def test_allocating_to_a_batch_reduces_the_available_quantity():
batch = Batch("batch-001", "SMALL-TABLE", qty=20, eta=date.today())
line = OrderLine("order-ref", "SMALL-TABLE", 2)
batch.allocate(line)
assert batch.available_quantity == 18
- Tests use plain Python objects — no ORM, no framework
- Business rules are expressed as simple unit tests
Dataclasses Are Great for Value Objects
- Value Object: defined by its attributes, has no persistent identity, is interchangeable with other instances with same values
- Use
@dataclass(frozen=True) for immutability
from dataclasses import dataclass
@dataclass(frozen=True)
class OrderLine:
orderid: str
sku: str
qty: int
Entities vs Value Objects
|
Entity |
Value Object |
| Identity |
Has a persistent identity |
Defined entirely by values |
| Equality |
By reference/ID |
By value (structural equality) |
| Mutability |
Typically mutable |
Preferably immutable |
| Example |
Batch (has a reference) |
OrderLine (no unique identity) |
Domain Services
- When a piece of logic doesn't naturally belong to any existing entity/value object, it becomes a domain service function
- Example:
allocate() is a standalone function in model.py, not a method on either Order or Batch
# Domain service function (src/allocation/domain/model.py)
def allocate(line: OrderLine, batches: List[Batch]) -> str:
try:
batch = next(b for b in sorted(batches) if b.can_allocate(line))
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
batch.allocate(line)
return batch.reference
Exceptions as Domain Concepts
class OutOfStock(Exception):
pass
- Domain exceptions are part of the ubiquitous language
Chapter 2: Repository Pattern
The Normal ORM Way: Model Depends on ORM
- Typical approach: define your model classes inheriting from the ORM base class (e.g.,
django.db.models.Model)
- Problem: your domain model is now coupled to the ORM — the most important layer depends on infrastructure
Inverting the Dependency: ORM Depends on Model
- Key library:
SQLAlchemy (specifically its Classical Mapping / Imperative Mapping)
- Define your domain model as plain Python classes first, then tell SQLAlchemy how to map them
# orm.py — SQLAlchemy imperative/classical mapping
from sqlalchemy.orm import mapper, relationship
from domain import model
def start_mappers():
lines_mapper = mapper(model.OrderLine, order_lines) # table object
mapper(model.Batch, batches, properties={
'_allocations': relationship(lines_mapper, secondary=allocations)
})
- This keeps the domain model ignorant of persistence — it's just plain Python classes
- The ORM imports the model, not the other way around
Introducing the Repository Pattern
- Repository: an abstraction over persistent storage; it pretends all data is in memory
- Simplest possible interface:
.add() and .get()
import abc
from domain import model
class AbstractRepository(abc.ABC):
@abc.abstractmethod
def add(self, batch: model.Batch):
raise NotImplementedError
@abc.abstractmethod
def get(self, reference) -> model.Batch:
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
def __init__(self, session):
self.session = session
def add(self, batch):
self.session.add(batch)
def get(self, reference):
return self.session.query(model.Batch).filter_by(
reference=reference
).one()
def list(self):
return self.session.query(model.Batch).all()
Building a Fake Repository for Tests
class FakeRepository(AbstractRepository):
def __init__(self, batches):
self._batches = set(batches)
def add(self, batch):
self._batches.add(batch)
def get(self, reference):
return next(b for b in self._batches if b.reference == reference)
def list(self):
return list(self._batches)
- Fake repos enable fast, isolated unit tests with no DB needed
Trade-Offs
| Pros |
Cons |
| Simple interface for storage |
Extra layer of abstraction (an "ORM is already an abstraction") |
| Easy to swap infra via fakes for testing |
ORM mapping requires extra setup |
| Writes are decoupled from reads |
Another thing for devs to learn |
Chapter 3: A Brief Interlude: On Coupling and Abstractions
Abstracting State Aids Testability
- Problem: how do you test code that has side effects (file I/O, HTTP calls, DB writes)?
- Three approaches:
- Mock everything (fragile, couples tests to implementation)
- Edge-to-edge testing (integration/E2E — slow, complex)
- Abstract away side effects behind simple interfaces (the book's preferred approach)
The "Ports and Adapters" / Hexagonal Architecture
- Business logic at the core, infrastructure at the edges
- Port: an abstract interface (e.g.,
AbstractRepository)
- Adapter: a concrete implementation (e.g.,
SqlAlchemyRepository, FakeRepository)
Functional Core, Imperative Shell
- Alternative framing: keep the core logic pure (no side effects), push I/O to the shell
- Not always achievable in Python, but a useful ideal
Choosing the Right Abstraction
- Good abstractions simplify testing, aid in reasoning about code
- Bad abstractions add complexity without reducing coupling
- Rule of thumb: introduce an abstraction when you have a concrete reason (e.g., testability, swappability)
Chapter 4: Our First Use Case: Flask API and Service Layer
Connecting Our Application to the Real World
- Flask is the web framework used throughout the book
- Entrypoint (Flask route) should be thin — it just translates HTTP requests into domain operations
A Typical Service Function
# service_layer/services.py
def allocate(orderid: str, sku: str, qty: int, repo: AbstractRepository, session) -> str:
batches = repo.list()
if not is_valid_sku(sku, batches):
raise InvalidSku(f"Invalid sku {sku}")
batchref = model.allocate(OrderLine(orderid, sku, qty), batches)
session.commit()
return batchref
- Service layer (aka use-case layer): orchestrates a workflow
- Fetches data from repositories
- Calls domain model methods
- Commits changes
- It doesn't contain business logic — it coordinates it
The Service Layer and the Flask Endpoint
# entrypoints/flask_app.py
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
session = get_session()
repo = repository.SqlAlchemyRepository(session)
try:
batchref = services.allocate(
request.json["orderid"],
request.json["sku"],
request.json["qty"],
repo, session,
)
except (model.OutOfStock, services.InvalidSku) as e:
return jsonify({"message": str(e)}), 400
return jsonify({"batchref": batchref}), 201
Why Is Everything Called a Service?
- Domain service: pure logic that doesn't belong on an entity (e.g.,
model.allocate())
- Service layer / application service: orchestration — fetches objects, invokes domain, persists results
- Infrastructure service: talks to external systems (email, file systems)
Testing the Service Layer with Fakes
# Unit tests use FakeRepository — no database needed
def test_returns_allocation():
repo = FakeRepository([
Batch("b1", "COMPLICATED-LAMP", 100, eta=None),
])
result = services.allocate("o1", "COMPLICATED-LAMP", 10, repo, FakeSession())
assert result == "b1"
Chapter 5: TDD in High Gear and Low Gear
Test Pyramid / Test Spectrum
- Unit tests (domain model): fast, isolated, business-rule focused
- Service-layer tests: test use cases, use fakes for infra
- E2E tests (against Flask + real DB): slow but validate integration
Rules of Thumb for Test Types
- Write a few E2E tests to prove the plumbing works
- Write the bulk of your tests against the service layer (using fakes)
- Write unit tests for complex domain logic
- Aim to test behavior, not implementation
High Gear vs. Low Gear
- Low gear: testing the domain model directly when building/debugging complex logic
- High gear: testing via the service layer once the API is stable
- Moving tests up to the service layer makes them less coupled to implementation — easier to refactor
Fully Decoupling the Service-Layer Tests from the Domain
- Ideal: service-layer tests only use domain primitives (strings, ints) — never import domain classes
- This makes it easier to change the domain model without rewriting all tests
Chapter 6: Unit of Work Pattern
The Unit of Work Collaborates with the Repository
- Unit of Work (UoW): abstracts the concept of an atomic operation / database transaction
- Provides a single entrypoint to repositories and handles
commit() / rollback()
class AbstractUnitOfWork(abc.ABC):
products: repository.AbstractRepository
def __enter__(self) -> "AbstractUnitOfWork":
return self
def __exit__(self, *args):
self.rollback()
@abc.abstractmethod
def commit(self):
raise NotImplementedError
@abc.abstractmethod
def rollback(self):
raise NotImplementedError
Real and Fake UoW
# Real UoW
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
self.session_factory = session_factory
def __enter__(self):
self.session = self.session_factory()
self.products = repository.SqlAlchemyRepository(self.session)
return super().__enter__()
def __exit__(self, *args):
super().__exit__(*args)
self.session.close()
def commit(self):
self.session.commit()
def rollback(self):
self.session.rollback()
# Fake UoW for tests
class FakeUnitOfWork(AbstractUnitOfWork):
def __init__(self):
self.products = FakeRepository([])
self.committed = False
def commit(self):
self.committed = True
def rollback(self):
pass
Using UoW in Service Layer
def allocate(orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork) -> str:
line = OrderLine(orderid, sku, qty)
with uow: # context manager handles transaction
product = uow.products.get(sku=sku)
if product is None:
raise InvalidSku(f"Invalid sku {sku}")
batchref = product.allocate(line)
uow.commit() # explicit commit
return batchref
- The
with block ensures rollback on failure (via __exit__)
- Explicit
commit() makes it clear when a transaction succeeds
Chapter 7: Aggregates and Consistency Boundaries
Why Do We Need Aggregates?
- An aggregate is a cluster of domain objects treated as a single unit for data changes
- It acts as a consistency boundary: all invariants within the aggregate are guaranteed to be consistent after each operation
- Operations across aggregates accept eventual consistency
Choosing an Aggregate
- In the allocation example,
Product becomes the aggregate root
- A
Product contains its Batches — you always go through the Product to allocate
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
self.sku = sku
self.batches = batches
self.version_number = version_number
def allocate(self, line: OrderLine) -> str:
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
except StopIteration:
raise OutOfStock(f"Out of stock for sku {line.sku}")
batch.allocate(line)
self.version_number += 1
return batch.reference
Optimistic Concurrency with Version Numbers
version_number is incremented on each change to detect concurrent modifications
- Implemented with SQLAlchemy's
version_id_col feature
-- Optimistic lock at DB level
UPDATE products SET version_number=:new
WHERE sku=:sku AND version_number=:old
- If two transactions try to modify the same
Product concurrently, one will fail (0 rows updated) and retry
One Aggregate = One Repository
- Each aggregate type gets its own repository
- The repository fetches the entire aggregate (root + children) and saves it as a unit
- Aggregates reference each other only by identity (e.g.,
sku string), not by direct object reference
Chapter 8: Events and the Message Bus
Events
- Domain Event: a data structure representing something that happened in the domain
- Events are simple dataclasses
from dataclasses import dataclass
class Event:
pass
@dataclass
class OutOfStock(Event):
sku: str
@dataclass
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: str
The Model Raises Events
- Domain objects collect events on an internal list; they don't dispatch them directly
- Events are harvested and dispatched after the transaction commits
class Product:
def __init__(self, sku, batches, version_number=0):
self.sku = sku
self.batches = batches
self.version_number = version_number
self.events = [] # list of domain events
def allocate(self, line):
try:
batch = next(...)
except StopIteration:
self.events.append(events.OutOfStock(line.sku))
return None
...
The Message Bus Maps Events to Handlers
# service_layer/messagebus.py
HANDLERS = {
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
def handle(event: events.Event, uow: AbstractUnitOfWork):
for handler in HANDLERS[type(event)]:
handler(event, uow=uow)
The UoW Publishes Events to the Message Bus
class AbstractUnitOfWork(abc.ABC):
def collect_new_events(self):
for product in self.products.seen:
while product.events:
yield product.events.pop(0)
products.seen tracks all aggregates loaded during this UoW
- After commit, the service layer calls
messagebus.handle() for each collected event
Chapter 9: Going to Town on the Message Bus
A New Architecture: Everything Is an Event Handler
- Refactors the entire application so that all use cases are event handlers
- The Flask endpoint just creates an event and passes it to the message bus
- The message bus becomes the single entrypoint for the service layer
# Flask entrypoint
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
try:
event = events.AllocationRequired(
request.json["orderid"], request.json["sku"], request.json["qty"]
)
results = messagebus.handle(event, uow)
batchref = results.pop(0)
except InvalidSku as e:
return jsonify({"message": str(e)}), 400
return jsonify({"batchref": batchref}), 201
Test-Driving a New Handler
- New use case:
BatchQuantityChanged event triggers reallocation if needed
- Test is end-to-end through the message bus: send events, assert outcomes
Optionally: Unit Testing Event Handlers in Isolation with a Fake Message Bus
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
def __init__(self):
super().__init__()
self.events_published = []
def publish_events(self):
for product in self.products.seen:
while product.events:
self.events_published.append(product.events.pop(0))
- Allows testing individual handlers without triggering the full chain
- Use edge-to-edge tests first; resort to isolated handler tests only for complex chains
Chapter 10: Commands and Command Handler
Commands and Events
|
Event |
Command |
| Named |
Past tense (BatchCreated) |
Imperative mood (CreateBatch) |
| Error handling |
Fail independently |
Fail noisily |
| Sent to |
All listeners (broadcast) |
One recipient (directed) |
- Commands capture intent; Events capture facts
- Commands are sent by one actor to another with expectation of result
- Events are broadcast — sender doesn't know/care who handles them
class Command:
pass
@dataclass
class Allocate(Command):
orderid: str
sku: str
qty: int
@dataclass
class CreateBatch(Command):
ref: str
sku: str
qty: int
eta: Optional[date] = None
Differences in Exception Handling
# messagebus.py
Message = Union[commands.Command, events.Event]
def handle(message: Message, uow):
results = []
queue = [message]
while queue:
message = queue.pop(0)
if isinstance(message, events.Event):
handle_event(message, queue, uow)
elif isinstance(message, commands.Command):
cmd_result = handle_command(message, queue, uow)
results.append(cmd_result)
return results
def handle_event(event, queue, uow):
for handler in EVENT_HANDLERS[type(event)]:
try:
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.exception("Exception handling event %s", event)
continue # events: log and continue
def handle_command(command, queue, uow):
handler = COMMAND_HANDLERS[type(command)] # exactly one handler
try:
result = handler(command, uow=uow)
queue.extend(uow.collect_new_events())
return result
except Exception:
logger.exception("Exception handling command %s", command)
raise # commands: raise to caller
- Events: catch exceptions, log, continue (event failures are isolated)
- Commands: let exceptions bubble up (the caller needs to know it failed)
Recovering from Errors Synchronously
- Library:
tenacity — Python library for retry logic with exponential backoff
from tenacity import Retrying, RetryError, stop_after_attempt, wait_exponential
def handle_event(event, queue, uow):
for handler in EVENT_HANDLERS[type(event)]:
try:
for attempt in Retrying(
stop=stop_after_attempt(3),
wait=wait_exponential()
):
with attempt:
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except RetryError as retry_failure:
logger.error("Failed to handle event %s times, giving up!",
retry_failure.last_attempt.attempt_number)
continue
Chapter 11: Event-Driven Architecture: Using Events to Integrate Microservices
Distributed Ball of Mud, and Thinking in Nouns
- Anti-pattern: splitting a system into microservices by nouns (Orders, Batches, Warehouse) and using synchronous HTTP calls between them
- This creates temporal coupling: every part must be online at the same time
- Connascence: a taxonomy for types of coupling. Events reduce Connascence of Execution/Timing to Connascence of Name (only need to agree on event names/fields)
The Alternative: Temporal Decoupling Using Asynchronous Messaging
- Think in terms of verbs (ordering, allocating), not nouns (orders, batches)
- Microservices should be consistency boundaries (like aggregates)
- Use asynchronous messaging (events) to integrate services — accept eventual consistency
Using a Redis Pub/Sub Channel for Integration
- Message broker: infrastructure that routes messages between services (e.g., Redis pub/sub, RabbitMQ, Kafka, Amazon EventBridge)
- The book uses Redis pub/sub as a lightweight example
# Redis event consumer (entrypoint) — src/allocation/entrypoints/redis_eventconsumer.py
r = redis.Redis(**config.get_redis_host_and_port())
def main():
orm.start_mappers()
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("change_batch_quantity") # listen on channel
for m in pubsub.listen():
handle_change_batch_quantity(m)
def handle_change_batch_quantity(m):
data = json.loads(m["data"])
cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"])
messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
# Redis event publisher — src/allocation/adapters/redis_eventpublisher.py
def publish(channel, event: events.Event):
r.publish(channel, json.dumps(asdict(event)))
Internal Versus External Events
- Keep a clear distinction: not all internal events should be published externally
- Outbound events are a place where validation is especially important
Chapter 12: Command-Query Responsibility Segregation (CQRS)
Domain Models Are for Writing
- All the patterns in the book (aggregates, UoW, domain events) exist to enforce rules during writes
- Reads have very different requirements: simpler logic, higher throughput, staleness is OK
Most Users Aren't Going to Buy Your Furniture
- Reads vastly outnumber writes in most systems
- Reads can be eventually consistent — trade consistency for performance
Post/Redirect/Get and CQS
- CQS (Command-Query Separation): functions should either modify state OR answer questions, never both
- Separate your POST (write) and GET (read) endpoints
Hold On to Your Lunch, Folks
- Simplest CQRS: use raw SQL for reads, bypassing the domain model entirely
# views.py — raw SQL read model
def allocations(orderid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
with uow:
results = list(uow.session.execute(
'SELECT ol.sku, b.reference'
' FROM allocations AS a'
' JOIN batches AS b ON a.batch_id = b.id'
' JOIN order_lines AS ol ON a.orderline_id = ol.id'
' WHERE ol.orderid = :orderid',
dict(orderid=orderid)
))
return [{"sku": sku, "batchref": batchref} for sku, batchref in results]
Alternatives for Read Models
| Option |
Pros |
Cons |
| Use repositories |
Simple, consistent approach |
Performance issues with complex queries |
| Custom ORM queries |
Reuse DB config and model definitions |
Adds query language complexity |
| Hand-rolled SQL |
Fine control over performance |
Schema changes affect both ORM and SQL |
| Separate read store (events) |
Read-only copies scale out; queries are simple |
Complex technique; eventual consistency |
Updating a Read Model Table Using an Event Handler
# Build a denormalized read-model table, updated by event handlers
EVENT_HANDLERS = {
events.Allocated: [
handlers.publish_allocated_event,
handlers.add_allocation_to_read_model, # updates the read table
],
}
def add_allocation_to_read_model(event: events.Allocated, uow):
with uow:
uow.session.execute(
'INSERT INTO allocations_view (orderid, sku, batchref)'
' VALUES (:orderid, :sku, :batchref)',
dict(orderid=event.orderid, sku=event.sku, batchref=event.batchref)
)
uow.commit()
Changing Our Read Model Implementation Is Easy
- Can swap from SQL table to Redis with minimal changes
- Integration tests still pass because they test through the message bus, not the storage backend
# Redis-backed read model
def add_allocation_to_read_model(event: events.Allocated, _):
redis_eventpublisher.update_readmodel(event.orderid, event.sku, event.batchref)
# In redis_eventpublisher.py
def update_readmodel(orderid, sku, batchref):
r.hset(orderid, sku, batchref)
def get_readmodel(orderid):
return r.hgetall(orderid)
Chapter 13: Dependency Injection (and Bootstrapping)
Implicit Versus Explicit Dependencies
- Explicit: handler declares
uow: AbstractUnitOfWork as a parameter — easy to swap in tests
- Implicit: handler does
from allocation.adapters import email — tied to implementation; requires mock.patch in tests
# Explicit (preferred) — dependency is a parameter
def send_out_of_stock_notification(event: events.OutOfStock, send_mail: Callable):
send_mail('stock@made.com', f'Out of stock for {event.sku}')
# Implicit (less preferred) — dependency is a hardcoded import
from allocation.adapters import email
def send_out_of_stock_notification(event: events.OutOfStock, uow):
email.send('stock@made.com', f'Out of stock for {event.sku}')
- Mocks tightly couple tests to implementation; explicit DI couples them to abstractions
Preparing Handlers: Manual DI with Closures and Partials
# Using closures / functools.partial to inject dependencies
import functools
def bootstrap(...)
uow = unit_of_work.SqlAlchemyUnitOfWork()
# closure approach
allocate_composed = lambda cmd: allocate(cmd, uow)
# or equivalently with a partial
allocate_composed = functools.partial(allocate, uow=uow)
- Closures use late binding (can cause surprises with mutable deps)
functools.partial uses early binding — generally safer
An Alternative Using Classes
class AllocateHandler:
def __init__(self, uow: unit_of_work.AbstractUnitOfWork):
self.uow = uow
def __call__(self, cmd: commands.Allocate):
line = OrderLine(cmd.orderid, cmd.sku, cmd.qty)
with self.uow:
# ... handler body
A Bootstrap Script
# src/allocation/bootstrap.py
def bootstrap(
start_orm: bool = True,
uow: unit_of_work.AbstractUnitOfWork = unit_of_work.SqlAlchemyUnitOfWork(),
send_mail: Callable = email.send,
publish: Callable = redis_eventpublisher.publish,
) -> messagebus.MessageBus:
if start_orm:
orm.start_mappers()
dependencies = {"uow": uow, "send_mail": send_mail, "publish": publish}
injected_event_handlers = {
event_type: [
inject_dependencies(handler, dependencies)
for handler in event_handlers
]
for event_type, event_handlers in handlers.EVENT_HANDLERS.items()
}
# ... same for command handlers
return messagebus.MessageBus(
uow=uow,
event_handlers=injected_event_handlers,
command_handlers=injected_command_handlers,
)
def inject_dependencies(handler, dependencies):
params = inspect.signature(handler).parameters
deps = {
name: dependency
for name, dependency in dependencies.items()
if name in params
}
return lambda message: handler(message, **deps)
inspect.signature matches handler params to the dependencies dict
- Keeps all initialization in one place — the composition root
Initializing DI in Our Tests
# Integration test bootstrap
@pytest.fixture
def sqlite_bus(sqlite_session_factory):
bus = bootstrap.bootstrap(
start_orm=True,
uow=unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory),
send_mail=lambda *args: None, # no-op
publish=lambda *args: None,
)
yield bus
clear_mappers()
# Unit test bootstrap
def bootstrap_test_app():
return bootstrap.bootstrap(
start_orm=False,
uow=FakeUnitOfWork(),
send_mail=lambda *args: None,
publish=lambda *args: None,
)
Building an Adapter "Properly": A Worked Example
- Define the abstract interface (ABC)
- Implement the real thing (e.g.,
EmailNotifications using smtplib)
- Build a fake for unit/service-layer tests (e.g.,
FakeNotifications)
- Find a "less fake" for integration (e.g., MailHog in Docker)
- Integration test against the less-fake version
# Abstract
class AbstractNotifications(abc.ABC):
@abc.abstractmethod
def send(self, destination, message):
raise NotImplementedError
# Real
class EmailNotifications(AbstractNotifications):
def __init__(self, smtp_host=DEFAULT_HOST, port=DEFAULT_PORT):
self.server = smtplib.SMTP(smtp_host, port=port)
self.server.noop()
def send(self, destination, message):
msg = f"Subject: allocation service notification\n{message}"
self.server.sendmail(
from_addr="allocations@example.com",
to_addrs=[destination],
msg=msg
)
# Fake (for unit tests)
class FakeNotifications(AbstractNotifications):
def __init__(self):
self.sent = defaultdict(list) # type: Dict[str, List[str]]
def send(self, destination, message):
self.sent[destination].append(message)
DI Framework Mentions
Inject — used at MADE.com, works fine but annoys Pylint
Punq — written by Bob Gregory
dependencies — by the DRY-Python crew
- For most cases, manual DI (lambdas/partials + bootstrap script) is sufficient
Message Bus Is Given Handlers at Runtime
class MessageBus:
def __init__(self, uow, event_handlers, command_handlers):
self.uow = uow
self.event_handlers = event_handlers
self.command_handlers = command_handlers
def handle(self, message: Message):
self.queue = [message]
while self.queue:
message = self.queue.pop(0)
if isinstance(message, events.Event):
self.handle_event(message)
elif isinstance(message, commands.Command):
self.handle_command(message)
else:
raise Exception(f"{message} was not an Event or Command")
Epilogue
How Do I Get There from Here?
- You don't need a greenfield project — these patterns can be incrementally adopted in existing codebases
- Link refactoring to feature work to justify the investment ("architecture tax")
Separating Entangled Responsibilities
- Identify use cases — give each an imperative name (Apply Billing Charges, Create Workspace)
- Create a single function/class per use case that orchestrates the work
- Pull data access and I/O out of the domain model and into use-case functions
Identifying Aggregates and Bounded Contexts
- Break apart your object graph by replacing direct object references with IDs
- Bidirectional links are a code smell — they suggest aggregate boundaries are wrong
- For reads, replace ORM loops with raw SQL (first step toward CQRS)
- For writes, use message bus + events to coordinate between aggregates
An Event-Driven Approach to Go to Microservices via Strangler Pattern
- Strangler Fig pattern: wrap a new system around the edges of the old one, gradually replacing functionality
- Event interception:
- Raise events in the old system
- Build a new system that consumes those events
- Replace the old system
Convincing Your Stakeholders to Try Something New
- Use domain modeling (event storming, CRC cards, event modeling) to align engineers and business
- Treat domain problems as TDD katas — start small, demonstrate value
- Reliable messaging is hard: Redis pub/sub is not a production message broker; consider RabbitMQ, Kafka, Amazon EventBridge
- Small, focused transactions: design operations to fail independently
- Idempotency: handlers should be safe to call repeatedly with the same message
- Event schema evolution: document and version your events (JSON schema + markdown)
Appendix A: Summary Diagram and Table
Component Reference
| Layer |
Component |
Description |
| Domain |
Entity |
Object with identity that may change over time |
|
Value Object |
Immutable, defined entirely by its attributes |
|
Aggregate |
Cluster of objects treated as a unit; consistency boundary |
|
Event |
Dataclass representing something that happened |
|
Command |
Dataclass representing a job the system should do |
| Service Layer |
Handler |
Receives a command/event and orchestrates response |
|
Unit of Work |
Abstraction around atomic DB operations |
|
Message Bus |
Routes commands/events to their handlers |
| Adapters |
Repository |
Abstraction around persistent storage |
|
Event Publisher |
Pushes events to external message bus |
| Entrypoints |
Web (Flask) |
Translates HTTP requests → commands |
|
Event Consumer |
Translates external messages → commands |
| Infrastructure |
External Message Bus |
Message broker (Redis, Kafka, etc.) for inter-service communication |
Appendix B: A Template Project Structure
Project Layout
.
├── Dockerfile
├── Makefile
├── docker-compose.yml
├── mypy.ini
├── requirements.txt
├── src
│ ├── allocation
│ │ ├── __init__.py
│ │ ├── adapters/ (orm.py, repository.py)
│ │ ├── config.py
│ │ ├── domain/ (model.py, events.py, commands.py)
│ │ ├── entrypoints/ (flask_app.py, redis_eventconsumer.py)
│ │ └── service_layer/ (handlers.py, messagebus.py, unit_of_work.py)
│ └── setup.py
└── tests
├── conftest.py
├── e2e/ (test_api.py)
├── integration/ (test_orm.py, test_repository.py)
├── pytest.ini
└── unit/ (test_batches.py, test_handlers.py)
Key Config Patterns
config.py: functions (not constants) that read from os.environ with sensible dev defaults
docker-compose.yml: define services, set env vars, mount volumes for dev hot-reload
setup.py: minimal pip install -e for src package
PYTHONDONTWRITEBYTECODE=1: prevents .pyc clutter when mounting volumes in Docker
- Library:
environ-config — elegant environment-based config in Python
Appendix C: Swapping Out the Infrastructure: Do Everything with CSVs
Key Lesson
- Because the domain model and service layer are decoupled from infrastructure, you can swap Postgres for CSV files by implementing new
CsvRepository and CsvUnitOfWork classes
- All domain logic and service-layer orchestration is reused unchanged
class CsvRepository(repository.AbstractRepository):
def __init__(self, folder):
self._batches_path = Path(folder) / "batches.csv"
self._allocations_path = Path(folder) / "allocations.csv"
self._batches = {}
self._load()
def get(self, reference):
return self._batches.get(reference)
def add(self, batch):
self._batches[batch.reference] = batch
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
def __init__(self, folder):
self.batches = CsvRepository(folder)
def commit(self):
# write allocations back to CSV
...
def rollback(self):
pass
Appendix D: Repository and Unit of Work Patterns with Django
(Brief overview — see the book for full Django-specific code)
- Django's ORM uses the Active Record pattern (model = table row), unlike SQLAlchemy's Data Mapper
- You can still apply Repository and UoW patterns on top of Django, though it requires more effort
- The key trick: use Django's
model_to_dict and manual mapping to keep your domain model separate from Django models
- Django's
transaction.atomic() can serve as the UoW's commit() boundary
Appendix E: Validation
Three Types of Validation
- Syntax validation: is the input well-formed? (correct types, required fields present)
- Semantics validation: does the input make sense? (is this a real SKU? is quantity positive?)
- Pragmatic validation: can we do what's being asked? (is there enough stock?)
Where to Validate
- Syntax: at the edge — in the entrypoint or on the message/command itself
- Semantics: in the domain model or handlers (e.g., checking a SKU exists)
- Pragmatic: deep in the domain model (business rules like "can't allocate more than available")
The Ensure Pattern
# Validation at command creation
@dataclass
class Allocate(Command):
orderid: str
sku: str
qty: int
def __post_init__(self):
if not self.orderid:
raise ValueError("orderid is required")
if self.qty <= 0:
raise ValueError("qty must be positive")
- Validate as early as possible
- Commands/events should be valid by construction
- Don't let invalid data propagate through the system