This post is sixth experiment described in post “Experiment: 10 Different implementations of Aggregate“.
This experiment is the same as the previous one, but without inheriting or describing the protocol/interface for the issue. So the goal is to have separate domain objects per state, no id or messaging inside the domain part. Exception of invalid transition is generated in the handler by checking if the object responds to a specific command.
class Issue: def open(self) -> Open: return Open() class Open: def start(self) -> InProgress: return InProgress() def resolve(self) -> Resolved: return Resolved() def close(self) -> Closed: return Closed() class InProgress: def stop(self) -> Open: return Open() def resolve(self) -> Resolved: return Resolved() def close(self) -> Closed: return Closed() class Resolved: def close(self) -> Closed: return Closed() def reopen(self) -> Open: return Open() class Closed: def reopen(self) -> Open: return Open() class CommandHandler(Handler): def __call__(self, cmd: Command) -> None: issue, version = self.get_issue(cmd.id) event = self.process(cmd, issue)( originator_id=cmd.id, originator_version=version + 1, timestamp=datetime.now(tz=timezone.utc), ) self._event_store.put(event) @singledispatchmethod def process(self, cmd: Command, issue) -> Type[Event]: ... @process.register def create(self, _: CreateIssue, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'open') issue.open() return IssueOpened @process.register def start(self, _: StartIssueProgress, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'start') issue.start() return IssueProgressStarted @process.register def stop(self, _: StopIssueProgress, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'stop') issue.stop() return IssueProgressStopped @process.register def close(self, _: CloseIssue, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'close') issue.close() return IssueClosed @process.register def reopen(self, _: ReopenIssue, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'reopen') issue.reopen() return IssueReopened @process.register def resolve(self, _: ResolveIssue, issue) -> Type[Event]: self.raise_invalid_unless_respond_to(issue, 'resolve') issue.resolve() return IssueResolved def raise_invalid_unless_respond_to(self, issue, method: Text) -> None: if not ismethod(getattr(issue, method, None)): raise InvalidTransition(method) def get_issue( self, issue_id: IssueID ) -> Tuple[Union[Issue, Open, InProgress, Resolved, Closed], int]: issue = Issue() version = 0 for event in self._event_store.get(issue_id): event_type = type(event) if event_type == IssueOpened: issue = issue.open() elif event_type == IssueProgressStarted: issue = issue.start() elif event_type == IssueProgressStopped: issue = issue.stop() elif event_type == IssueReopened: issue = issue.reopen() elif event_type == IssueResolved: issue = issue.resolve() elif event_type == IssueClosed: issue = issue.close() version = event.originator_version return issue, version
Reflections on this experiment
I actually prefer to have a protocol describing the issue interface. It may be less OOP but give us more transparency on what is going on. Also, it’s easier for refactoring tools to work with such code.
On the other side, we can argue that duck typing is more readable for business people. But if you argue this way then the ruby version actually needs a little less boilerplate than python. In such approach, all additional code like type annotation is just boilerplate cause it’s not looking like plain English. Same time refactoring starts to be more difficult and for developers, it can be actually less readable.