This post is fifth experiment described in post “Experiment: 10 Different implementations of Aggregate“.
The goal of this experiment is to have separate objects for every state of issue. There is no id in domain classes, not shared state. Also, domain classes are not creating domain events.
class Issue(Protocol): def open(self) -> Issue: raise InvalidTransition('create') def start(self) -> Issue: raise InvalidTransition('start') def stop(self) -> Issue: raise InvalidTransition('stop') def close(self) -> Issue: raise InvalidTransition('close') def reopen(self) -> Issue: raise InvalidTransition('reopen') def resolve(self) -> Issue: raise InvalidTransition('resolve') class Init(Issue): def open(self) -> Open: return Open() class Open(Issue): def start(self) -> InProgress: return InProgress() def close(self) -> Closed: return Closed() def resolve(self) -> Resolved: return Resolved() class InProgress(Issue): def stop(self) -> Open: return Open() def close(self) -> Closed: return Closed() def resolve(self) -> Resolved: return Resolved() class Closed(Issue): def reopen(self) -> Open: return Open() class Resolved(Issue): def close(self) -> Closed: return Closed() def reopen(self) -> Open: return Open() class CommandHandler(Handler): def __call__(self, cmd: Command) -> None: issue, version = self.get_issue(cmd.id) event = self.process(cmd, issue)( originator_id=cmd.id, originator_version=version + 1, timestamp=datetime.now(tz=timezone.utc), ) self._event_store.put(event) @singledispatchmethod def process(self, cmd: Command, issue: Issue) -> Type[Event]: ... @process.register def create(self, _: CreateIssue, issue: Issue) -> Type[Event]: issue.open() return IssueOpened @process.register def start(self, _: StartIssueProgress, issue: Issue) -> Type[Event]: issue.start() return IssueProgressStarted @process.register def stop(self, _: StopIssueProgress, issue: Issue) -> Type[Event]: issue.stop() return IssueProgressStopped @process.register def close(self, _: CloseIssue, issue: Issue) -> Type[Event]: issue.close() return IssueClosed @process.register def reopen(self, _: ReopenIssue, issue: Issue) -> Type[Event]: issue.reopen() return IssueReopened @process.register def resolve(self, _: ResolveIssue, issue: Issue) -> Type[Event]: issue.resolve() return IssueResolved def get_issue(self, issue_id: IssueID) -> Tuple[Issue, int]: issue = Init() version = 0 for event in self._event_store.get(issue_id): event_type = type(event) if event_type == IssueOpened: issue = issue.open() elif event_type == IssueProgressStarted: issue = issue.start() elif event_type == IssueProgressStopped: issue = issue.stop() elif event_type == IssueReopened: issue = issue.reopen() elif event_type == IssueResolved: issue = issue.resolve() elif event_type == IssueClosed: issue = issue.close() version = event.originator_version return issue, version
Reflections on this experiment
I like this approach, but I see some downsides to this specific implementation. Reading aggregate state process all state passes in domain manner, so with checking of invalid transitions. In the production solution, it could not stay this way cause business rules will change and some transitions will start to be illegal. So command handler implementation should create a specific object for the last event.
But with bigger aggregates that would have more entities it also could be problematic. Maybe loading all events should still be carried by aggregate but without checking of business rules. In this case, emitting events should go to aggregate classes.
For me, this implementation for sure needs to be checked with a bigger aggregate before I will decide that I want to use it in production code. But for sure I love the immutability of objects and using the polymorphic approach to implement different states.
Be notified about next experiments
If you want to be notified join to my mailinglist.