Experiment 2: Aggregate with exposed queries

This post is second experiment described in post “Experiment: 10 Different implementations of Aggregate“.

This implementation assumed that aggregate is created from a projection of events. Aggregate is not aware of events or anything about event-sourcing. The responsibility of aggregate is to manipulate internal state and business logic.

The interesting part for me is about the separation of responsibility. Event-sourcing here is fully in the application layer and domain logic is not aware of any application details, even about events.

@dataclass
class Issue:
    id: IssueID
    version: int = 0
    _state: Optional[State] = None

    def create(self) -> None:
        self._state = State.OPEN

    def start(self) -> None:
        self._state = State.IN_PROGRESS

    def stop(self) -> None:
        self._state = State.OPEN

    def close(self) -> None:
        self._state = State.CLOSED

    def reopen(self) -> None:
        self._state = State.REOPENED

    def resolve(self) -> None:
        self._state = State.RESOLVED

    def can_create(self) -> bool:
        return self._state != State.OPEN

    def can_start(self) -> bool:
        valid_states = [State.OPEN, State.REOPENED]
        return self._state in valid_states

    def can_close(self) -> bool:
        valid_states = [
            State.OPEN,
            State.IN_PROGRESS,
            State.REOPENED,
            State.RESOLVED,
        ]
        return self._state in valid_states

    def can_reopen(self) -> bool:
        valid_states = [State.CLOSED, State.RESOLVED]
        return self._state in valid_states

    def can_stop(self) -> bool:
        return self._state == State.IN_PROGRESS

    def can_resolve(self) -> bool:
        valid_states = [State.OPEN, State.REOPENED, State.IN_PROGRESS]
        return self._state in valid_states


class IssueProjection:
    def __init__(self, event_store: EventStore) -> None:
        self.event_store = event_store

    def __call__(self, issue: Issue) -> Issue:
        for event in self.event_store.get(issue.id):
            self.apply(event, issue)
            issue.version = event.originator_version
        return issue

    def apply(self, event: Event, issue: Issue) -> None:
        event_type = type(event)
        if event_type == IssueOpened:
            issue.create()
        elif event_type == IssueProgressStarted:
            issue.start()
        elif event_type == IssueProgressStopped:
            issue.stop()
        elif event_type == IssueReopened:
            issue.reopen()
        elif event_type == IssueResolved:
            issue.resolve()
        elif event_type == IssueClosed:
            issue.close()


class CommandHandler(Handler):
    def __call__(self, cmd: Command) -> None:
        projection = IssueProjection(self._event_store)
        issue = projection(Issue(cmd.id))
        self.process(cmd, issue)

    @singledispatchmethod
    def process(self, cmd: Command, issue: Issue) -> None:
        ...

    @process.register
    def create(self, _: CreateIssue, issue: Issue) -> None:
        if not issue.can_create():
            raise InvalidTransition('create', issue.id)
        self._trigger_event(issue, IssueOpened)

    @process.register
    def start(self, _: StartIssueProgress, issue: Issue) -> None:
        if not issue.can_start():
            raise InvalidTransition('start', issue.id)
        self._trigger_event(issue, IssueProgressStarted)

    @process.register
    def stop(self, _: StopIssueProgress, issue: Issue) -> None:
        if not issue.can_stop():
            raise InvalidTransition('stop', issue.id)
        self._trigger_event(issue, IssueProgressStopped)

    @process.register
    def close(self, _: CloseIssue, issue: Issue) -> None:
        if not issue.can_close():
            raise InvalidTransition('close', issue.id)
        self._trigger_event(issue, IssueClosed)

    @process.register
    def reopen(self, _: ReopenIssue, issue: Issue) -> None:
        if not issue.can_reopen():
            raise InvalidTransition('reopen', issue.id)
        self._trigger_event(issue, IssueReopened)

    @process.register
    def resolve(self, _: ResolveIssue, issue: Issue) -> None:
        if not issue.can_resolve():
            raise InvalidTransition('resolve', issue.id)
        self._trigger_event(issue, IssueResolved)

    def _trigger_event(self, issue: Issue, event_class: Type[TEvent]) -> None:
        event = event_class(
            originator_id=issue.id,
            originator_version=issue.version + 1,
            timestamp=datetime.now(tz=timezone.utc),
        )
        self._event_store.put(event)

Reflections on this experiment

In the blog post “My structure for DDD component” you can see that I have two kinds of events. First is an aggregate event that has minimal data (or even no data at all). Second is the application event which is enriched with context data like command_id which triggered this event, Aggregate, timestamp of event, etc. Attributes that are useful for application but are not important for business logic. This experiment gave me the interesting idea that Events in the DDD component could be part of the application layer where IO is totally fine (like DateTime).

  • clear separation of Aggregate (Domain layer) from any details of application implementation (even event-sourcing technique)
  • events are fully in the application layer so there is no problem with enriching events with any additional attributes

source (Arkency version)

Be notified about next experiments

If you want to be notified join to my mailinglist.

I don’t spam!

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!