Experiment 1: Classical example with AggregateRoot

This post is first experiment described in post “Experiment: 10 Different implementations of Aggregate“.

This is a classical and most used example of event-sourcing implementation (Greg Young’s CQRS example). It’s dependent on the framework cause of inheritance from the AggregatRoot base class.

I like that it don’t have read methods, so all observability is based on Events. Internal state is used only for validating state changes.

Update: Framwork

While writing this post new release of the event-sourcing framework was released. The new version is incompatible with the old version. Also, my workaround in this experiment to mutate aggregate by aggregate not by the event is not working anymore I decided to use my own simplified event-sourcing lib. Just for this experiment.

new source and source with event-sourcing v8

New version

class Issue(Aggregate):
    state: State = None

    def create(self) -> None:
        if not self.can_create():
            raise InvalidTransition('create', self.id)
        self._trigger_event(IssueOpened)

    def start(self) -> None:
        if not self.can_start():
            raise InvalidTransition('start', self.id)
        self._trigger_event(IssueProgressStarted)

    def stop(self) -> None:
        if not self.can_stop():
            raise InvalidTransition('stop', self.id)
        self._trigger_event(IssueProgressStopped)

    def close(self) -> None:
        if not self.can_close():
            raise InvalidTransition('close', self.id)
        self._trigger_event(IssueClosed)

    def reopen(self) -> None:
        if not self.can_reopen():
            raise InvalidTransition('reopen', self.id)
        self._trigger_event(IssueReopened)

    def resolve(self) -> None:
        if not self.can_resolve():
            raise InvalidTransition('resolve', self.id)
        self._trigger_event(IssueResolved)

    def can_create(self) -> bool:
        return self.state != State.OPEN

    def can_start(self) -> bool:
        valid_states = [State.OPEN, State.REOPENED]
        return self.state in valid_states

    def can_close(self) -> bool:
        valid_states = [
            State.OPEN,
            State.IN_PROGRESS,
            State.REOPENED,
            State.RESOLVED,
        ]
        return self.state in valid_states

    def can_reopen(self) -> bool:
        valid_states = [State.CLOSED, State.RESOLVED]
        return self.state in valid_states

    def can_stop(self) -> bool:
        return self.state == State.IN_PROGRESS

    def can_resolve(self) -> bool:
        valid_states = [State.OPEN, State.REOPENED, State.IN_PROGRESS]
        return self.state in valid_states

    def apply(self, event: Event) -> None:
        event_type = type(event)
        if event_type == IssueOpened:
            self.state = State.OPEN
        elif event_type == IssueProgressStarted:
            self.state = State.IN_PROGRESS
        elif event_type == IssueProgressStopped:
            self.state = State.OPEN
        elif event_type == IssueReopened:
            self.state = State.REOPENED
        elif event_type == IssueResolved:
            self.state = State.RESOLVED
        elif event_type == IssueClosed:
            self.state = State.CLOSED
        super().apply(event)

    def __repr__(self) -> Text:
        return (
            f'<{self.__class__.__name__} '
            f'id={self.id!s} '
            f'version={self.version} '
            f'state={self.state and self.state.name}'
            f'>'
        )


class CommandHandler(Handler):
    def __init__(self, event_store: EventStore) -> None:
        super().__init__(event_store)
        self._repository = Repository[Issue](event_store=event_store)

    @singledispatchmethod
    def __call__(self, cmd: Command) -> None:
        ...

    @__call__.register
    def _(self, cmd: CreateIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.create()

    @__call__.register
    def _(self, cmd: CloseIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.close()

    @__call__.register
    def _(self, cmd: StartIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.start()

    @__call__.register
    def _(self, cmd: StopIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.stop()

    @__call__.register
    def _(self, cmd: ReopenIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.reopen()

    @__call__.register
    def _(self, cmd: ResolveIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.resolve()

    def aggregate(self, issue_id: IssueID) -> ContextManager[Issue]:
        return self._repository.aggregate(Issue(issue_id))

Event-sourcing v8 framwork version

class Issue(AggregateRoot):
    class State(Enum):
        OPEN = 'OPEN'
        CLOSED = 'CLOSED'
        IN_PROGRESS = 'IN_PROGRESS'
        REOPENED = 'REOPENED'
        RESOLVED = 'RESOLVED'

    state: State = None

    def create(self) -> None:
        if not self.can_create():
            raise InvalidTransition('create', self.id)
        self.__trigger_event__(IssueOpened)

    def start(self) -> None:
        if not self.can_start():
            raise InvalidTransition('start', self.id)
        self.__trigger_event__(IssueProgressStarted)

    def stop(self) -> None:
        if not self.can_stop():
            raise InvalidTransition('stop', self.id)
        self.__trigger_event__(IssueProgressStopped)

    def close(self) -> None:
        if not self.can_close():
            raise InvalidTransition('close', self.id)
        self.__trigger_event__(IssueClosed)

    def reopen(self) -> None:
        if not self.can_reopen():
            raise InvalidTransition('reopen', self.id)
        self.__trigger_event__(IssueReopened)

    def resolve(self) -> None:
        if not self.can_resolve():
            raise InvalidTransition('resolve', self.id)
        self.__trigger_event__(IssueResolved)

    def can_create(self) -> bool:
        return self.state != Issue.State.OPEN

    def can_start(self) -> bool:
        valid_states = [Issue.State.OPEN, Issue.State.REOPENED]
        return self.state in valid_states

    def can_close(self) -> bool:
        valid_states = [
            Issue.State.OPEN,
            Issue.State.IN_PROGRESS,
            Issue.State.REOPENED,
            Issue.State.RESOLVED,
        ]
        return self.state in valid_states

    def can_reopen(self) -> bool:
        valid_states = [Issue.State.CLOSED, Issue.State.RESOLVED]
        return self.state in valid_states

    def can_stop(self) -> bool:
        return self.state == Issue.State.IN_PROGRESS

    def can_resolve(self) -> bool:
        valid_states = [
            Issue.State.OPEN, Issue.State.REOPENED, Issue.State.IN_PROGRESS,
        ]
        return self.state in valid_states

    def __mutate__(self, event: TVersionedEvent) -> None:
        event_type = type(event)
        if event_type == IssueOpened:
            self.state = Issue.State.OPEN
        elif event_type == IssueProgressStarted:
            self.state = Issue.State.IN_PROGRESS
        elif event_type == IssueProgressStopped:
            self.state = Issue.State.OPEN
        elif event_type == IssueReopened:
            self.state = Issue.State.REOPENED
        elif event_type == IssueResolved:
            self.state = Issue.State.RESOLVED
        elif event_type == IssueClosed:
            self.state = Issue.State.CLOSED
        else:
            super().__mutate__(event)
        self.___version__ = event.originator_version

Python version comments (for old version)

Even if it’s the most classical example it’s not so easy with the python eventsourcing framework. In this framework processing events are out of aggregate root and by default you need a mutator function that has access to the internal state. To omit this limitation I needed to create a very limited mutator method in the command handler that was dispatching events to aggregate mutate method.

def _mutate(self, issue: Optional[Issue], event: Event) -> Issue:
    if issue is None:
        issue = Issue(
            id=event.originator_id,
            __created_on__=event.timestamp,
            __version__=0,
        )
    issue.__mutate__(event)
    return issue

Also to implement this example I needed to read source of eventsourcing framework to find out how to manage aggregate version.

I’m disappointed how many things I needed to check in source to implement this most classical and well-known implementation of event sourcing (used for example in Greg Young’s example). Our python eventsourcing framework simplest example is really framework dependent and I couldn’t use the same test for this example and others. Too big coupling between Events used in tests and implementation.

Be notified about next experiments

If you want to be notified join to my mailinglist.

I don’t spam!

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!