Experiment goal is to try implementing typical workflow of Jira issue.
The goal is to use exactly the same test suit for every experiment. To achieve it I’m using pytest with
generate_test hook that will parametrize the whole test suit with every implemented controller.
def pytest_generate_tests(metafunc): handlers = [ aggregate_root.CommandHandler, ] names = [h.__module__ for h in handlers] metafunc.parametrize('handler_cls', handlers, ids=names)
Scenarious as close to Ruby version as I could.
def test_start_from_stopped(self): self.arrange(CreateIssue, StartIssueProgress, StopIssueProgress) with self.assert_started(): self.act(StartIssueProgress) def test_resolve_from_opened(self): self.arrange(CreateIssue) with self.assert_resolved(): self.act(ResolveIssue)
Full tests code is available here.
Here are examples that I plan to reimplement.
- Classical example [original]
- Aggregate with exposed queries [original]
- Aggregate with extracted state [original]
- Functional aggregate [original]
- Polymorphic [original]
- Duck typing [original]
- Aggregate with yield [original]
- Aggregate with repository [original]
- Roles/DCI [original]
- POPO with attributes [original]
Ruby experiments are based on the RailsEventStore library. The event store is not hard to write, but I decided to use some popular python library. I’ve found event-sourcing framework which is much more than EventStore. It’s a full framework with many things that I need for these experiments. So it seems that it’s a good choice. But after approaching testing and learning this framework I have many objections to the eventsourcing framework.
Events mutate the Aggregate problem
This means that the Event object is not DTO. Boundary over component is based on commands and events, so if event have an implementation of business logic then I cannot separate it from tests.
Multiple class inheritance problem
In my opinion, this is an anty-pattern due to the diamond problem. Java removed this as an option when it was introduced, cause it’s hard to track which functionality will run in which order. Also, it makes reading this implementation so much harder.
Artificial events that promotes bad habits
One of the basic pre implemented events in this framework is AttributeChanged. This kind of event is what I call a ‘mechanical’ event. It does not deliver any domain knowledge and in my opinion, it’s against DDD. I understand that sometimes it can be helpful in non-DDD components, but implementing it as one of the default events is promoting bad habits.
A potential issue with resilience
The event store is just another listener to events. So with distributed system emitting events to SQL DB in transaction can fail, but emitting the same event to some queue can succeed. This will create an inconsistent state in systems. I would prefer to store events in DB and committing by event store before sending an event to other listeners. This strategy is fine when it’s in one application with the same transaction.
Update 1: Framework
After the first experiment event-sourcing framework released a new 9.0.0 version which is not compatible with the previous one. It resolved issues like multi-class inheritance or promoting bad habits. Also simplified code a log. But my main concern was about events mutating aggregate, which is even more forced. So I decided to write simplified eventsourcing lib for my experiments. It’s in memory only and focused on reproducing Arkency examples.
class EventStore(Generic[TEvent]): def __init__(self) -> None: self._in_memory: Dict[UUID, List[TEvent]] = defaultdict(list) def put(self, *events: TEvent) -> None: for event in events: self._in_memory[event.originator_id].append(event) def get( self, originator_id: UUID, after_version: Optional[int] = None, to_version: Optional[int] = None, desc: bool = False, limit: Optional[int] = None, ) -> Iterator[TEvent]: def event_is_valid(event: TEvent) -> bool: version = event.originator_version if after_version and not (version > after_version): return False if to_version and not (version <= to_version): return False return True valid_events = filter(event_is_valid, self._in_memory[originator_id]) return sorted( valid_events, reverse=desc, key=lambda e: e.originator_version, )[:limit]
Aggregate Base class
class Aggregate: def __init__(self, originator_id: UUID) -> None: self._id = originator_id self._version = 0 self._pending_events: List[Event] =  @property def id(self) -> UUID: return self._id @property def version(self) -> int: return self._version @property def pending_events(self) -> Iterator[Event]: return iter(self._pending_events) def apply(self, event: TEvent) -> None: self._version = event.originator_version def _trigger_event(self, event_class: Type[TEvent], **kwargs: Any) -> None: new_event = event_class( originator_id=self.id, originator_version=self.version + 1, timestamp=datetime.now(tz=timezone.utc), **kwargs, ) self.apply(new_event) self._pending_events.append(new_event)
class Repository(Generic[TAggregate]): def __init__(self, event_store: EventStore[TEvent]) -> None: self.event_store = event_store def apply( self, aggregate: TAggregate, version: Optional[int] = None, ) -> None: for event in self.event_store.get(aggregate.id, to_version=version): aggregate.apply(event) def save(self, aggregate: TAggregate) -> None: self.event_store.put(*aggregate.pending_events) @contextmanager def aggregate(self, aggregate: TAggregate) -> ContextManager[TAggregate]: self.apply(aggregate) yield aggregate self.save(aggregate)
Update 2: Test framework
I decided to resign from pytest and use unittest framwork for tests. While I’m writing code in pure python then I tought it would be better to hace whole project in pure python. Also I prefer unittest over pytest.
I will reimplement experiment by experiment and reflect on it in next blogposts.
If you want to be notified join to my mailinglist.