Experiment 5: Polymorphic

This post is fifth experiment described in post “Experiment: 10 Different implementations of Aggregate“.

The goal of this experiment is to have separate objects for every state of issue. There is no id in domain classes, not shared state. Also, domain classes are not creating domain events.

class Issue(Protocol):
    def open(self) -> Issue:
        raise InvalidTransition('create')

    def start(self) -> Issue:
        raise InvalidTransition('start')

    def stop(self) -> Issue:
        raise InvalidTransition('stop')

    def close(self) -> Issue:
        raise InvalidTransition('close')

    def reopen(self) -> Issue:
        raise InvalidTransition('reopen')

    def resolve(self) -> Issue:
        raise InvalidTransition('resolve')


class Init(Issue):
    def open(self) -> Open:
        return Open()


class Open(Issue):
    def start(self) -> InProgress:
        return InProgress()

    def close(self) -> Closed:
        return Closed()

    def resolve(self) -> Resolved:
        return Resolved()


class InProgress(Issue):
    def stop(self) -> Open:
        return Open()

    def close(self) -> Closed:
        return Closed()

    def resolve(self) -> Resolved:
        return Resolved()


class Closed(Issue):
    def reopen(self) -> Open:
        return Open()


class Resolved(Issue):
    def close(self) -> Closed:
        return Closed()

    def reopen(self) -> Open:
        return Open()


class CommandHandler(Handler):
    def __call__(self, cmd: Command) -> None:
        issue, version = self.get_issue(cmd.id)
        event = self.process(cmd, issue)(
            originator_id=cmd.id,
            originator_version=version + 1,
            timestamp=datetime.now(tz=timezone.utc),
        )
        self._event_store.put(event)

    @singledispatchmethod
    def process(self, cmd: Command, issue: Issue) -> Type[Event]:
        ...

    @process.register
    def create(self, _: CreateIssue, issue: Issue) -> Type[Event]:
        issue.open()
        return IssueOpened

    @process.register
    def start(self, _: StartIssueProgress, issue: Issue) -> Type[Event]:
        issue.start()
        return IssueProgressStarted

    @process.register
    def stop(self, _: StopIssueProgress, issue: Issue) -> Type[Event]:
        issue.stop()
        return IssueProgressStopped

    @process.register
    def close(self, _: CloseIssue, issue: Issue) -> Type[Event]:
        issue.close()
        return IssueClosed

    @process.register
    def reopen(self, _: ReopenIssue, issue: Issue) -> Type[Event]:
        issue.reopen()
        return IssueReopened

    @process.register
    def resolve(self, _: ResolveIssue, issue: Issue) -> Type[Event]:
        issue.resolve()
        return IssueResolved

    def get_issue(self, issue_id: IssueID) -> Tuple[Issue, int]:
        issue = Init()
        version = 0
        for event in self._event_store.get(issue_id):
            event_type = type(event)
            if event_type == IssueOpened:
                issue = issue.open()
            elif event_type == IssueProgressStarted:
                issue = issue.start()
            elif event_type == IssueProgressStopped:
                issue = issue.stop()
            elif event_type == IssueReopened:
                issue = issue.reopen()
            elif event_type == IssueResolved:
                issue = issue.resolve()
            elif event_type == IssueClosed:
                issue = issue.close()
            version = event.originator_version
        return issue, version

Reflections on this experiment

I like this approach, but I see some downsides to this specific implementation. Reading aggregate state process all state passes in domain manner, so with checking of invalid transitions. In the production solution, it could not stay this way cause business rules will change and some transitions will start to be illegal. So command handler implementation should create a specific object for the last event.

But with bigger aggregates that would have more entities it also could be problematic. Maybe loading all events should still be carried by aggregate but without checking of business rules. In this case, emitting events should go to aggregate classes.

For me, this implementation for sure needs to be checked with a bigger aggregate before I will decide that I want to use it in production code. But for sure I love the immutability of objects and using the polymorphic approach to implement different states.

source (Arkency version)

Be notified about next experiments

If you want to be notified join to my mailinglist.

I don’t spam!

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!