This post is first experiment described in post “Experiment: 10 Different implementations of Aggregate“.
This is a classical and most used example of event-sourcing implementation (Greg Young’s CQRS example). It’s dependent on the framework cause of inheritance from the AggregatRoot base class.
I like that it don’t have read methods, so all observability is based on Events. Internal state is used only for validating state changes.
Update: Framwork
While writing this post new release of the event-sourcing framework was released. The new version is incompatible with the old version. Also, my workaround in this experiment to mutate aggregate by aggregate not by the event is not working anymore I decided to use my own simplified event-sourcing lib. Just for this experiment.
new source and source with event-sourcing v8
New version
class Issue(Aggregate): state: State = None def create(self) -> None: if not self.can_create(): raise InvalidTransition('create', self.id) self._trigger_event(IssueOpened) def start(self) -> None: if not self.can_start(): raise InvalidTransition('start', self.id) self._trigger_event(IssueProgressStarted) def stop(self) -> None: if not self.can_stop(): raise InvalidTransition('stop', self.id) self._trigger_event(IssueProgressStopped) def close(self) -> None: if not self.can_close(): raise InvalidTransition('close', self.id) self._trigger_event(IssueClosed) def reopen(self) -> None: if not self.can_reopen(): raise InvalidTransition('reopen', self.id) self._trigger_event(IssueReopened) def resolve(self) -> None: if not self.can_resolve(): raise InvalidTransition('resolve', self.id) self._trigger_event(IssueResolved) def can_create(self) -> bool: return self.state != State.OPEN def can_start(self) -> bool: valid_states = [State.OPEN, State.REOPENED] return self.state in valid_states def can_close(self) -> bool: valid_states = [ State.OPEN, State.IN_PROGRESS, State.REOPENED, State.RESOLVED, ] return self.state in valid_states def can_reopen(self) -> bool: valid_states = [State.CLOSED, State.RESOLVED] return self.state in valid_states def can_stop(self) -> bool: return self.state == State.IN_PROGRESS def can_resolve(self) -> bool: valid_states = [State.OPEN, State.REOPENED, State.IN_PROGRESS] return self.state in valid_states def apply(self, event: Event) -> None: event_type = type(event) if event_type == IssueOpened: self.state = State.OPEN elif event_type == IssueProgressStarted: self.state = State.IN_PROGRESS elif event_type == IssueProgressStopped: self.state = State.OPEN elif event_type == IssueReopened: self.state = State.REOPENED elif event_type == IssueResolved: self.state = State.RESOLVED elif event_type == IssueClosed: self.state = State.CLOSED super().apply(event) def __repr__(self) -> Text: return ( f'<{self.__class__.__name__} ' f'id={self.id!s} ' f'version={self.version} ' f'state={self.state and self.state.name}' f'>' ) class CommandHandler(Handler): def __init__(self, event_store: EventStore) -> None: super().__init__(event_store) self._repository = Repository[Issue](event_store=event_store) @singledispatchmethod def __call__(self, cmd: Command) -> None: ... @__call__.register def _(self, cmd: CreateIssue) -> None: with self.aggregate(cmd.id) as issue: issue.create() @__call__.register def _(self, cmd: CloseIssue) -> None: with self.aggregate(cmd.id) as issue: issue.close() @__call__.register def _(self, cmd: StartIssueProgress) -> None: with self.aggregate(cmd.id) as issue: issue.start() @__call__.register def _(self, cmd: StopIssueProgress) -> None: with self.aggregate(cmd.id) as issue: issue.stop() @__call__.register def _(self, cmd: ReopenIssue) -> None: with self.aggregate(cmd.id) as issue: issue.reopen() @__call__.register def _(self, cmd: ResolveIssue) -> None: with self.aggregate(cmd.id) as issue: issue.resolve() def aggregate(self, issue_id: IssueID) -> ContextManager[Issue]: return self._repository.aggregate(Issue(issue_id))
Event-sourcing v8 framwork version
class Issue(AggregateRoot): class State(Enum): OPEN = 'OPEN' CLOSED = 'CLOSED' IN_PROGRESS = 'IN_PROGRESS' REOPENED = 'REOPENED' RESOLVED = 'RESOLVED' state: State = None def create(self) -> None: if not self.can_create(): raise InvalidTransition('create', self.id) self.__trigger_event__(IssueOpened) def start(self) -> None: if not self.can_start(): raise InvalidTransition('start', self.id) self.__trigger_event__(IssueProgressStarted) def stop(self) -> None: if not self.can_stop(): raise InvalidTransition('stop', self.id) self.__trigger_event__(IssueProgressStopped) def close(self) -> None: if not self.can_close(): raise InvalidTransition('close', self.id) self.__trigger_event__(IssueClosed) def reopen(self) -> None: if not self.can_reopen(): raise InvalidTransition('reopen', self.id) self.__trigger_event__(IssueReopened) def resolve(self) -> None: if not self.can_resolve(): raise InvalidTransition('resolve', self.id) self.__trigger_event__(IssueResolved) def can_create(self) -> bool: return self.state != Issue.State.OPEN def can_start(self) -> bool: valid_states = [Issue.State.OPEN, Issue.State.REOPENED] return self.state in valid_states def can_close(self) -> bool: valid_states = [ Issue.State.OPEN, Issue.State.IN_PROGRESS, Issue.State.REOPENED, Issue.State.RESOLVED, ] return self.state in valid_states def can_reopen(self) -> bool: valid_states = [Issue.State.CLOSED, Issue.State.RESOLVED] return self.state in valid_states def can_stop(self) -> bool: return self.state == Issue.State.IN_PROGRESS def can_resolve(self) -> bool: valid_states = [ Issue.State.OPEN, Issue.State.REOPENED, Issue.State.IN_PROGRESS, ] return self.state in valid_states def __mutate__(self, event: TVersionedEvent) -> None: event_type = type(event) if event_type == IssueOpened: self.state = Issue.State.OPEN elif event_type == IssueProgressStarted: self.state = Issue.State.IN_PROGRESS elif event_type == IssueProgressStopped: self.state = Issue.State.OPEN elif event_type == IssueReopened: self.state = Issue.State.REOPENED elif event_type == IssueResolved: self.state = Issue.State.RESOLVED elif event_type == IssueClosed: self.state = Issue.State.CLOSED else: super().__mutate__(event) self.___version__ = event.originator_version
Python version comments (for old version)
Even if it’s the most classical example it’s not so easy with the python eventsourcing framework. In this framework processing events are out of aggregate root and by default you need a mutator function that has access to the internal state. To omit this limitation I needed to create a very limited mutator method in the command handler that was dispatching events to aggregate mutate method.
def _mutate(self, issue: Optional[Issue], event: Event) -> Issue: if issue is None: issue = Issue( id=event.originator_id, __created_on__=event.timestamp, __version__=0, ) issue.__mutate__(event) return issue
Also to implement this example I needed to read source of eventsourcing framework to find out how to manage aggregate version.
I’m disappointed how many things I needed to check in source to implement this most classical and well-known implementation of event sourcing (used for example in Greg Young’s example). Our python eventsourcing framework simplest example is really framework dependent and I couldn’t use the same test for this example and others. Too big coupling between Events used in tests and implementation.
Be notified about next experiments
If you want to be notified join to my mailinglist.