Post in answer to a StackOverflow question. You can find code used in this example here.
Package structure
In component package I use this modules as starting point.
- migrations/
- app.py
- commands.py
- events.py
- exceptions.py
- repository.py
- service.py
- entity.py
Commands
A command is always DTO and as specific, as it can be from a domain perspective. I aim to create separate classes for commands so I can just dispatch handlers by command class.
@dataclass class Create(Command): command_id: CommandID = field(default_factory=uuid1) timestamp: datetime = field(default_factory=datetime.utcnow)
Component Events
Our component emits events when an internal state changes. Like commands, it’s also DTO but they describe message to the system about important domain events. Our component doesn’t know who will listen and react to these events and actually should not care. That’s a big difference between commands and events. Our DDD component should be self-sufficient so we should not send commands to other components. If it is maybe its process manager, not DDD component.
There should be related command_id (and application event can have other correlation ids like user_id or session_id).
class Updated(Event): command_id: CommandID event_id: EventID = field(default_factory=uuid1) timestamp: datetime = field(default_factory=datetime.utcnow)
Application service
This is a facade for our component. It’s a starting point cause all our BDD tests will be base on Facade API and all communication with our component is going through the facade.
Applications service I implement in two objects. Command Handler is calling our aggregate with command and takes care of persistency and other IO communication (like emits events). Command Handler is only changing state of aggregate and replies with events generated by processed commands.
The second object is the Query Handler. This one is only for a read. If you need some views for this component state you can use it. Depends on implementation it can use different DB if there is a need.
The point of application service is to create stable API for component and protecting boundaries so it can be easily refactored later.
If you know that component should be developed in DDD structure, but it’s overkill at begging you can start with application service as stable API and use only DB model directly inside. If you will create stable boundaries for a component then it will be easy for a safe refactor.
class CommandHandler: def __init__(self, repository: Repository) -> None: self._repository = repository self._listeners: List[Listener] = [] super().__init__() def register(self, listener: Listener) -> None: if listener not in self._listeners: self._listeners.append(listener) def unregister(self, listener: Listener) -> None: if listener in self._listeners: self._listeners.remove(listener) @safe @singledispatchmethod def handle(self, command: Command) -> Optional[Event]: entity: Entity = self._repository.get(command.entity_id) event: Event = app_event(self._handle(command, entity), command) for listener in self._listeners: listener(event) self._repository.save(entity) return event @safe @handle.register(Create) def create(self, command: Create) -> Event: entity = Entity.create() self._repository.save(entity) return Created(command.command_id, entity.id) @singledispatchmethod def _handle(self, c: Command, u: Entity) -> Entity.Event: raise NotImplementedError @_handle.register(UpdateValue) def _(self, command: UpdateValue, entity: Entity) -> Entity.Event: return entity.update(command.value)
Aggregate
Aggregate is the Core component of DDD architecture. All business logic should be placed here. No dependencies on IO or other resources. Generally, this object provides only behaviors without exposing internal data so it’s hard to persist without accessing private data. That’s why for persistence I use DTO object that is handled by the repository.
This object only changes it’s internal state and is emitting events (not commands to other objects). This rule is one of check when you are deciding if you want to use DDD component or other architecture (like process manager).
EntityID = NewType('EntityID', UUID) class EntityDTO: id: EntityID value: Optional[Text] class Entity: id: EntityID dto: EntityDTO class Event: pass class Updated(Event): pass def __init__(self, dto: EntityDTO) -> None: self.id = dto.id self.dto = dto @classmethod def create(cls) -> 'Entity': dto = EntityDTO() dto.id = EntityID(uuid1()) dto.value = None return Entity(dto) def update(self, value: Text) -> Updated: self.dto.value = value return self.Updated()
Repository
The persistence of aggregate is the responsibility of a repository. Also to create a reliable DDD component repository should also care about a transaction and sending application events. A good practice is to save aggregate DTO and events in one transaction to the same DB. Then send events by async workers. With this approach, your events would be sent at least 1 time. If you aim to have a solution with not guaranteed events sending you can send it directly like for example. Then you will have a solution with sending events max 1 time.
class ORMRepository(Repository): def __init__(self, session: Session): self._session = session self._query = self._session.query(EntityMapper) def get(self, entity_id: EntityID) -> Entity: dto = self._query.filter_by(uuid=entity_id).one_or_none() if not dto: raise NotFound(entity_id) return Entity(dto) def save(self, entity: Entity) -> None: self._session.add(entity.dto) self._session.flush()
For DTO I like to separate tables from DTO. So in case of SQLalchemy I choose mappers over declarative mode. Then DTO is pure python and all ORM magic is in the repository module. Also, mappers give you many tools to manipulate how you map DTO. You don’t need all columns or you can present data in different forms then you have in tables.
entities_t: Table = Table( 'entities', meta, Column('id', Integer, primary_key=True, autoincrement=True), Column('uuid', String, unique=True, index=True), Column('value', String, nullable=True), ) EntityMapper = mapper( EntityDTO, entities_t, properties={ 'id': entities_t.c.uuid, 'value': entities_t.c.value, }, column_prefix='_db_column_', )