Experiment 3: Aggregate with extracted state

This post is third experiment described in post “Experiment: 10 Different implementations of Aggregate“.

Experiment 3 is passing sourced IssueState into Issue (AggregateRoot). Business logic is handled by Issue and Issue is emitting domain events.

class IssueState:
    id: UUID
    version: int

    def __init__(self, issue_id: UUID) -> None:
        self.id: UUID = issue_id
        self.version = 0
        self._status: Optional[State] = None

    def open(self) -> bool:
        return self._status == State.OPEN

    def closed(self) -> bool:
        return self._status == State.CLOSED

    def in_progress(self) -> bool:
        return self._status == State.IN_PROGRESS

    def reopened(self) -> bool:
        return self._status == State.REOPENED

    def resolved(self) -> bool:
        return self._status == State.RESOLVED

    def apply(self, event: Event) -> None:
        event_type = type(event)
        if event_type == IssueOpened:
            self._status = State.OPEN
        elif event_type == IssueProgressStarted:
            self._status = State.IN_PROGRESS
        elif event_type == IssueProgressStopped:
            self._status = State.OPEN
        elif event_type == IssueReopened:
            self._status = State.REOPENED
        elif event_type == IssueResolved:
            self._status = State.RESOLVED
        elif event_type == IssueClosed:
            self._status = State.CLOSED
        self.version = event.originator_version

    def __repr__(self) -> Text:
        return (
            f'<{self.__class__.__name__} id={self.id} version={self.version}>'

    def __str__(self) -> Text:
        return f'{self._status and self._status.name}'

class Issue:
    def __init__(self, state: IssueState) -> None:
        self._state = state
        self._changes: List[Event] = []

    def changes(self) -> Iterator[Event]:
        return iter(self._changes)

    def create(self) -> None:
        if not self.can_create:
            raise InvalidTransition('create', self._state.id)

    def start(self) -> None:
        if not self.can_start:
            raise InvalidTransition('start', self._state.id)

    def stop(self) -> None:
        if not self.can_stop:
            raise InvalidTransition('stop', self._state.id)

    def close(self) -> None:
        if not self.can_close:
            raise InvalidTransition('close', self._state.id)

    def reopen(self) -> None:
        if not self.can_reopen:
            raise InvalidTransition('reopen', self._state.id)

    def resolve(self) -> None:
        if not self.can_resolve:
            raise InvalidTransition('resolve', self._state.id)

    def can_create(self) -> bool:
        return not self._state.open

    def can_start(self) -> bool:
        return self._state.open or self._state.reopened

    def can_close(self) -> bool:
        return (
            or self._state.in_progress
            or self._state.reopened
            or self._state.resolved

    def can_reopen(self) -> bool:
        return self._state.closed or self._state.resolved

    def can_stop(self) -> bool:
        return self._state.in_progress

    def can_resolve(self) -> bool:
        return (
            or self._state.reopened
            or self._state.in_progress

    def _trigger_event(self, event_class: Type[TEvent]) -> None:
        new_event = event_class(
            originator_version=self._state.version + 1,

    def __repr__(self) -> Text:
        return (
            f'<{self.__class__.__name__} '
            f'id={self._state.id!s} '
            f'version={self._state.version} '

class CommandHandler(Handler):
    def __call__(self, cmd: Command) -> None:

    def create(self, cmd: CreateIssue) -> None:
        with self.aggregate(cmd.id) as issue:

    def close(self, cmd: CloseIssue) -> None:
        with self.aggregate(cmd.id) as issue:

    def start(self, cmd: StartIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:

    def stop(self, cmd: StopIssueProgress) -> None:
        with self.aggregate(cmd.id) as issue:

    def reopen(self, cmd: ReopenIssue) -> None:
        with self.aggregate(cmd.id) as issue:

    def resolve(self, cmd: ResolveIssue) -> None:
        with self.aggregate(cmd.id) as issue:

    def aggregate(self, issue_id: IssueID) -> Iterator[Issue]:
        state = IssueState(issue_id)
        for event in self._event_store.2(issue_id):
        issue = Issue(state)
        yield issue

Reflections on this experiment

This one seems to be overcomplicated for me. I guess the benefits are in processing events separately by every object in the Aggregate tree (Entities, ValueObject) and project its state separately. A side-effect of this is that every object is aware that event-sourcing is used here and the application layer is mixed with the domain layer. Still logic of emitting events is in aggregate. Personally, I don’t like this one.

source (Arkency version)

Be notified about next experiments

If you want to be notified join to my mailinglist.

I don’t spam!


Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!