Experiment 3: Aggregate with extracted state

This post is third experiment described in post “Experiment: 10 Different implementations of Aggregate“.

Experiment 3 is passing sourced IssueState into Issue (AggregateRoot). Business logic is handled by Issue and Issue is emitting domain events.

class IssueState:
    id: UUID
    version: int

    def __init__(self, issue_id: UUID) -> None:
        self.id: UUID = issue_id
        self.version = 0
        self._status: Optional[State] = None

    @property
    def open(self) -> bool:
        return self._status == State.OPEN

    @property
    def closed(self) -> bool:
        return self._status == State.CLOSED

    @property
    def in_progress(self) -> bool:
        return self._status == State.IN_PROGRESS

    @property
    def reopened(self) -> bool:
        return self._status == State.REOPENED

    @property
    def resolved(self) -> bool:
        return self._status == State.RESOLVED

    def apply(self, event: Event) -> None:
        event_type = type(event)
        if event_type == IssueOpened:
            self._status = State.OPEN
        elif event_type == IssueProgressStarted:
            self._status = State.IN_PROGRESS
        elif event_type == IssueProgressStopped:
            self._status = State.OPEN
        elif event_type == IssueReopened:
            self._status = State.REOPENED
        elif event_type == IssueResolved:
            self._status = State.RESOLVED
        elif event_type == IssueClosed:
            self._status = State.CLOSED
        self.version = event.originator_version

    def __repr__(self) -> Text:
        return (
            f'<{self.__class__.__name__} id={self.id} version={self.version}>'
        )

    def __str__(self) -> Text:
        return f'{self._status and self._status.name}'


class Issue:
    def __init__(self, state: IssueState) -> None:
        self._state = state
        self._changes: List[Event] = []

    @property
    def changes(self) -> Iterator[Event]:
        return iter(self._changes)

    def create(self) -> None:
        if not self.can_create:
            raise InvalidTransition('create', self._state.id)
        self._trigger_event(IssueOpened)

    def start(self) -> None:
        if not self.can_start:
            raise InvalidTransition('start', self._state.id)
        self._trigger_event(IssueProgressStarted)

    def stop(self) -> None:
        if not self.can_stop:
            raise InvalidTransition('stop', self._state.id)
        self._trigger_event(IssueProgressStopped)

    def close(self) -> None:
        if not self.can_close:
            raise InvalidTransition('close', self._state.id)
        self._trigger_event(IssueClosed)

    def reopen(self) -> None:
        if not self.can_reopen:
            raise InvalidTransition('reopen', self._state.id)
        self._trigger_event(IssueReopened)

    def resolve(self) -> None:
        if not self.can_resolve:
            raise InvalidTransition('resolve', self._state.id)
        self._trigger_event(IssueResolved)

    @property
    def can_create(self) -> bool:
        return not self._state.open

    @property
    def can_start(self) -> bool:
        return self._state.open or self._state.reopened

    @property
    def can_close(self) -> bool:
        return (
            self._state.open
            or self._state.in_progress
            or self._state.reopened
            or self._state.resolved
        )

    @property
    def can_reopen(self) -> bool:
        return self._state.closed or self._state.resolved

    @property
    def can_stop(self) -> bool:
        return self._state.in_progress

    @property
    def can_resolve(self) -> bool:
        return (
            self._state.open
            or self._state.reopened
            or self._state.in_progress
        )

    def _trigger_event(self, event_class: Type[TEvent]) -> None:
        new_event = event_class(
            originator_id=self._state.id,
            originator_version=self._state.version + 1,
            timestamp=datetime.now(tz=timezone.utc),
        )
        self._state.apply(new_event)
        self._changes.append(new_event)

    def __repr__(self) -> Text:
        return (
            f'<{self.__class__.__name__} '
            f'id={self._state.id!s} '
            f'version={self._state.version} '
            f'state={self._state!s}'
            f'>'
        )


class CommandHandler(Handler):
    @singledispatchmethod
    def __call__(self, cmd: Command) -> None:
        ...

    @__call__.register
    def create(self, cmd: CreateIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.create()

    @__call__.register
    def close(self, cmd: CloseIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.close()

    @__call__.register
    def start(self, cmd: StartIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.start()

    @__call__.register
    def stop(self, cmd: StopIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.stop()

    @__call__.register
    def reopen(self, cmd: ReopenIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.reopen()

    @__call__.register
    def resolve(self, cmd: ResolveIssue) -> None:
        with self.aggregate(cmd.id) as issue:
            issue.resolve()

    @contextmanager
    def aggregate(self, issue_id: IssueID) -> Iterator[Issue]:
        state = IssueState(issue_id)
        for event in self._event_store.2(issue_id):
            state.apply(event)
        issue = Issue(state)
        yield issue
        self._event_store.put(*issue.changes)

Reflections on this experiment

This one seems to be overcomplicated for me. I guess the benefits are in processing events separately by every object in the Aggregate tree (Entities, ValueObject) and project its state separately. A side-effect of this is that every object is aware that event-sourcing is used here and the application layer is mixed with the domain layer. Still logic of emitting events is in aggregate. Personally, I don’t like this one.

source (Arkency version)

Be notified about next experiments

If you want to be notified join to my mailinglist.

I don’t spam!

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!