Experiment: 10 Different implementations of Aggregate

Arkency is a company that shares its experiments with many approaches to DDD aggregate implementation. Some time ago I’ve shared my view on this here.

Arkency examples are in Ruby and use an event-sourcing approach. It’s quite different from mine, so some future blog posts will be about reimplementing the Arkency examples in Python.

Experiment subject

Experiment goal is to try implementing typical workflow of Jira issue.

Testing strategy

The goal is to use exactly the same test suit for every experiment. To achieve it I’m using pytest with generate_test hook that will parametrize the whole test suit with every implemented controller.

def pytest_generate_tests(metafunc):
    handlers = [
        aggregate_root.CommandHandler,
    ]
    names = [h.__module__ for h in handlers]
    metafunc.parametrize('handler_cls', handlers, ids=names)

Scenarious as close to Ruby version as I could.

def test_start_from_stopped(self):
    self.arrange(CreateIssue, StartIssueProgress, StopIssueProgress)
    with self.assert_started():
        self.act(StartIssueProgress)

def test_resolve_from_opened(self):
    self.arrange(CreateIssue)
    with self.assert_resolved():
        self.act(ResolveIssue)

Full tests code is available here.

Experiments

Here are examples that I plan to reimplement.

Framework

Ruby experiments are based on the RailsEventStore library. The event store is not hard to write, but I decided to use some popular python library. I’ve found event-sourcing framework which is much more than EventStore. It’s a full framework with many things that I need for these experiments. So it seems that it’s a good choice. But after approaching testing and learning this framework I have many objections to the eventsourcing framework.

Events mutate the Aggregate problem

This means that the Event object is not DTO. Boundary over component is based on commands and events, so if event have an implementation of business logic then I cannot separate it from tests.

Multiple class inheritance problem

In my opinion, this is an anty-pattern due to the diamond problem. Java removed this as an option when it was introduced, cause it’s hard to track which functionality will run in which order. Also, it makes reading this implementation so much harder.

Artificial events that promotes bad habits

One of the basic pre implemented events in this framework is AttributeChanged. This kind of event is what I call a ‘mechanical’ event. It does not deliver any domain knowledge and in my opinion, it’s against DDD. I understand that sometimes it can be helpful in non-DDD components, but implementing it as one of the default events is promoting bad habits.

A potential issue with resilience

The event store is just another listener to events. So with distributed system emitting events to SQL DB in transaction can fail, but emitting the same event to some queue can succeed. This will create an inconsistent state in systems. I would prefer to store events in DB and committing by event store before sending an event to other listeners. This strategy is fine when it’s in one application with the same transaction.

Update 1: Framework

After the first experiment event-sourcing framework released a new 9.0.0 version which is not compatible with the previous one. It resolved issues like multi-class inheritance or promoting bad habits. Also simplified code a log. But my main concern was about events mutating aggregate, which is even more forced. So I decided to write simplified eventsourcing lib for my experiments. It’s in memory only and focused on reproducing Arkency examples.

Event store

class EventStore(Generic[TEvent]):
    def __init__(self) -> None:
        self._in_memory: Dict[UUID, List[TEvent]] = defaultdict(list)

    def put(self, *events: TEvent) -> None:
        for event in events:
            self._in_memory[event.originator_id].append(event)

    def get(
            self,
            originator_id: UUID,
            after_version: Optional[int] = None,
            to_version: Optional[int] = None,
            desc: bool = False,
            limit: Optional[int] = None,
    ) -> Iterator[TEvent]:
        def event_is_valid(event: TEvent) -> bool:
            version = event.originator_version
            if after_version and not (version > after_version):
                return False
            if to_version and not (version <= to_version):
                return False
            return True

        valid_events = filter(event_is_valid, self._in_memory[originator_id])
        return sorted(
            valid_events, reverse=desc, key=lambda e: e.originator_version,
        )[:limit]

Aggregate Base class

class Aggregate:
    def __init__(self, originator_id: UUID) -> None:
        self._id = originator_id
        self._version = 0
        self._pending_events: List[Event] = []

    @property
    def id(self) -> UUID:
        return self._id

    @property
    def version(self) -> int:
        return self._version

    @property
    def pending_events(self) -> Iterator[Event]:
        return iter(self._pending_events)

    def apply(self, event: TEvent) -> None:
        self._version = event.originator_version

    def _trigger_event(self, event_class: Type[TEvent], **kwargs: Any) -> None:
        new_event = event_class(
            originator_id=self.id,
            originator_version=self.version + 1,
            timestamp=datetime.now(tz=timezone.utc),
            **kwargs,
        )

        self.apply(new_event)
        self._pending_events.append(new_event)

Repository

class Repository(Generic[TAggregate]):
    def __init__(self, event_store: EventStore[TEvent]) -> None:
        self.event_store = event_store

    def apply(
            self, aggregate: TAggregate, version: Optional[int] = None,
    ) -> None:
        for event in self.event_store.get(aggregate.id, to_version=version):
            aggregate.apply(event)

    def save(self, aggregate: TAggregate) -> None:
        self.event_store.put(*aggregate.pending_events)

    @contextmanager
    def aggregate(self, aggregate: TAggregate) -> ContextManager[TAggregate]:
        self.apply(aggregate)
        yield aggregate
        self.save(aggregate)

Update 2: Test framework

I decided to resign from pytest and use unittest framwork for tests. While I’m writing code in pure python then I tought it would be better to hace whole project in pure python. Also I prefer unittest over pytest.

Next steps

I will reimplement experiment by experiment and reflect on it in next blogposts.

If you want to be notified join to my mailinglist.

I don’t spam!

close

Hi there 👋
It’s nice to meet you.

Sign up to join my mailing list.

I don’t spam!