Table of contents
Chekov
CQRS/ES have been trending topics for some times now and there are some pretty nice libraries out there to build application following these patterns. But I found myself missing some of those tools to build Rust applications for a while so I decided to try to fulfill this gap by implementing it myself.
In this blog series I will cover my way of building an OpenSource framework called Chekov
. I will try to describe how I design and implement the whole thing. I will start from scratch, I will fail sometimes, but I will learn a lot and you too. I will try to cover as much part of the reflexion I have, feel free to ask for more details on Twitter or at the end of each blog post.
But you may have a question:
What the f*ck is Chekov
?
Chekov
is a Rust framework to help people building CQRS/ES application. It aims to provide everything you need to dispatch a command, produce events and build projections.
there are more complex and powerful features in the pipe but we will talk about it later in the process!
For now we will focus on having something that can work for the most simple project, we will iterate together on the features along the way! (Feel free to open an issue on the repository to provide feedbacks or feature suggestions!)
CQRS (Command Query Responsibility Segregation) is a pattern that separates Commands (Writes) from Queries (Reads). What does it mean?
It provides a pattern that separates the application layer into a command stack and a query stack.
- Commands are operations that change the application state and doesn’t return data.
- Queries are operations that return data without changing the application state.
Here’s a representation of how it works:
EventSourcing is a pattern that persists the state of a business entity as a sequence of events. When the state of a business entity changes, a new event is appended to the entity’s events list. An entity’s state is an aggregation of all the entity’s events, meaning that you can rebuild the entity’s state by consuming the events from the origin if you want to.
Where to start with this?
First I had a to figure out how to split this large project in little pieces to be able to iterate.
I already know a little about CQRS/ES because I used this pattern a bunch of times for both personal and professional applications. I contribute to several discussions and OpenSource projects like Commanded (which is a really nice Elixir lib and really inspiring for Chekov
).
But I wanted to be clear on what I will be doing for the next iteration. Starting with coding can be a problem but I want to be able to produce step by step milestones and iterate over time to improve the whole thing. That being said, I will certainly build something and do some refactoring on it in the next iteration but it’s ok!
I want chekov
to be modular and easy to use, meaning that the first thing to do is to split the EventSourcing from the CQRS and so on. At this time I will only define three modules in the framework:
- event_store is responsible for appending events to the store and reading them on demand
- command is responsible for handling the input command and executing them on handlers
- chekov which is the glue between the others
It seems simple but each one of them have their own domain.
Do we want the command module to know how events are saved to the store? I don’t think either
Do we want the command module to know how the events are serialized? No
Do we want the event_store to know that this stream is an aggregate stream? Naahhh
As you can see there are plenty of questions to ask and to answer on this subject. Some of you may know a lot more about that than me so feel free to contribute!
I will start by defining and designing the basic elements of the event_store which seems to be the base of all the framework.
The event store
Let’s define some key principles for our event store. (this specs can evolve over the implementation time, if you want an up to date documentation go check the repository!)
Define the concept
Our event_store
will deal with Event
and Stream
which will be store in a Storage
. The event_store can use different kinds of Storage
but he will expose the same public API allowing users to interact with multiple Storage
of the same or different types.
An Event
is a semantic log that describes something that has happened in the past, some call them Domain Events. Meaning that the event_store
will not modify the event itself in any way. An Event
is immutable and can’t be rollbacked or modified. As you can see I don’t talk about how events are generated because it’s out of scope. The event_store
don’t generate Event
, it will expose a public type that defines what an Event
should look like but nothing more.
A Stream
is a suite of events that can only grow in size. A Stream
doesn’t own the Event
but a link is created between Stream
and Event
. An Event
can be displayed in multiple Stream
but an Event
is useless if it is not link to a Stream
. Meaning that you can’t publish an event without linking it to a Stream, if it wasn’t linked then no one would be able to read it.
A StreamEvent
is the link between an Event
and a Stream
.
A Storage
is where the Event
s and Stream
s are stored.
Now that we have a little idea of the pieces, let’s design the first one which is the Storage
.
Note that everything defines here will be persisted in some way, but it’s too early to define how it will be persisted.
Design: The storage
The Storage
is the element which is responsible for storing the actual Event
s and for managing Stream
s creation and modifications.
We can use Storage
as an abstraction to a Backend
service which can be a database or anything else.
Also we want to be able to define different kinds of Storage
for our event_store
, every Storage
will have the same public API but will use different implementation under the hood.
Let’s define what a Storage
looks like and what we want to provide as public API.
A Storage
must have
- an API to manage
Stream
s - an API to append and read
Event
s
Because we want the Storage
to be implemented by multiple Backend
we will
use a trait
.
/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
/// Create a new stream with an identifier
///
/// # Errors
/// The stream creation can fail for multiple reasons:
///
/// - pure storage failure (unable to create the stream on the backend)
/// - The stream already exists
/// - The `stream_uuid` is invalid
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError>;
/// Delete a stream from the `Backend`
///
/// Do we need to provide a hard/soft deletion?
///
/// # Errors
/// The stream deletion can fail for multiple reasons:
///
/// - pure storage failure (unable to delete the stream on the backend)
/// - The stream doesn't exists
/// - The `stream_uuid` is invalid
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError>;
}
mod inmemory;
#[cfg(test)]
mod test;
/// Types that will be implemented later
#[derive(Debug, PartialEq)]
pub struct Stream {
deleted: bool,
}
#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
AlreadyExists,
MalformedStreamUUID,
StorageError(StorageError),
}
#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
DoesntExists,
MalformedStreamUUID,
StorageError(StorageError),
}
#[derive(Debug, PartialEq)]
pub enum StorageError {
Unknown,
}
Now that we have a Storage
trait we need to implement it in our first
Backend
which will be an InMemory
Backend.
use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
unimplemented!()
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
Ok(())
}
}
Our Backend implementation looks like this for now. Let’s add some tests:
use uuid::Uuid;
use crate::storage::{
inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};
mod creation {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
}
#[test]
fn fail_if_stream_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
assert_eq!(
storage.create_stream(&uuid),
Err(StreamCreationError::AlreadyExists)
);
}
#[test]
fn fail_if_stream_uuid_malformed() {
let mut storage = InMemoryBackend::default();
assert_eq!(
storage.create_stream("an uuid"),
Err(StreamCreationError::MalformedStreamUUID)
);
}
}
mod deletion {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
assert!(storage.delete_stream(&uuid).is_ok());
}
#[test]
fn fail_if_stream_doesnt_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert_eq!(
storage.delete_stream(&uuid),
Err(StreamDeletionError::DoesntExists)
);
}
#[test]
fn fail_if_stream_uuid_malformed() {
let mut storage = InMemoryBackend::default();
assert_eq!(
storage.delete_stream("an uuid"),
Err(StreamDeletionError::MalformedStreamUUID)
);
}
}
running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... FAILED
test storage::test::creation::fail_if_stream_exists ... FAILED
test storage::test::creation::success ... FAILED
test storage::test::deletion::fail_if_stream_doesnt_exists ... FAILED
test storage::test::deletion::fail_if_stream_uuid_malformed ... FAILED
test storage::test::deletion::success ... FAILED
failures:
---- storage::test::creation::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::creation::fail_if_stream_uuid_malformed' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
---- storage::test::creation::fail_if_stream_exists stdout ----
thread 'storage::test::creation::fail_if_stream_exists' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
---- storage::test::creation::success stdout ----
thread 'storage::test::creation::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
---- storage::test::deletion::fail_if_stream_doesnt_exists stdout ----
thread 'storage::test::deletion::fail_if_stream_doesnt_exists' panicked at 'assertion failed: `(left == right)`
left: `Ok(())`,
right: `Err(DoesntExists)`', event_store/src/storage/test.rs:60:9
---- storage::test::deletion::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::deletion::fail_if_stream_uuid_malformed' panicked at 'assertion failed: `(left == right)`
left: `Ok(())`,
right: `Err(MalformedStreamUUID)`', event_store/src/storage/test.rs:70:9
---- storage::test::deletion::success stdout ----
thread 'storage::test::deletion::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
failures:
storage::test::creation::fail_if_stream_exists
storage::test::creation::fail_if_stream_uuid_malformed
storage::test::creation::success
storage::test::deletion::fail_if_stream_doesnt_exists
storage::test::deletion::fail_if_stream_uuid_malformed
storage::test::deletion::success
test result: FAILED. 0 passed; 6 failed; 0 ignored; 0 measured; 0 filtered out
Let’s implement the inner create_stream
and delete_stream
methods then.
use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
// Move this in a generic function later
// stream_uuid must be validated way before this point
if stream_uuid.contains(' ') {
return Err(StreamCreationError::MalformedStreamUUID);
}
if self.streams.contains_key(stream_uuid) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams
.insert(stream_uuid.to_owned(), Stream { deleted: false });
Ok(self.streams.get(stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
// Move this in a generic function later
// stream_uuid must be validated way before this point
if stream_uuid.contains(' ') {
return Err(StreamDeletionError::MalformedStreamUUID);
}
if !self.streams.contains_key(stream_uuid) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream_uuid);
Ok(())
}
}
running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok
test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Design: The Event
Now that we are able to create and delete Stream
we need to be able to create Event
. We will talk about different kinds of events so let’s define some key principles to make it cristal clear.
- An
Event
defined outside of theevent_store
crate is aDomainEvent
which represents an actual domain event (i.e.:MoneyDeposited
,MoneyWithdrawn
). - An
DomainEvent
can be transformed into anUnsavedEvent
which is part of theevent_store
crate. AnUnsavedEvent
doesn’t hold theDomainEvent
type to be able to publish multiple kinds of events at the same time. - A
RecordedEvent
is defined and generated inside theevent_store
crate and can be transformed into aDomainEvent
by the end user.
Here’s a little graph to explain those interactions:
An Event
must have
- an
event_uuid
which is unique and identify the event - a
correlation_id
which correlates multiple events - a
causation_id
which defines who caused this event - a
type
which is human readable - a
data
attribute which is defined by the library user - a
metadata
attribute which is defined by the library user or by theevent_store
itself - a
created_at
value that defines when the event has been created
So let’s define our Event
trait and the UnsavedEvent
type. As we defines earlier an Event
is what the event_store
expects as input. It needs to match some behaviour to be considered a valid event in our predefined model. An UnsavedEvent
must have a type
, a data
attribute containing the DomainEvent
serialized, some metadata
s and two optional correlation_id
and causation_id
.
Because we need to be able to generate an UnsavedEvent
from a type that implement Event
we need to provide a try_from
implementation to handle this. The Event
trait must provide information that we can’t infer (like the event_type
), we could have use the generic std::any::type_name
method but it wouldn’t be helpful for Enum
because we are missing the variant part (we could solve that with serde
1). We will define an event_type
method returning this information.
The data
of the Event
must be automatically serializable using serde
, I’m thinking of using serde_json
serialization to store the event. But for now we will just tell that Event
needs to implement serde::Serialize
.
use serde::Serialize;
/// Represent event that can be handled by an `EventStore`
pub trait Event: Serialize {
/// Return a static str which define the event type
///
/// This str must be as precise as possible.
fn event_type(&self) -> &'static str;
}
mod unsaved;
mod recorded;
We can now define our UnsavedEvent
struct:
use super::Event;
use serde_json;
use uuid::Uuid;
/// An `UnsavedEvent` is created from a type that implement Event
///
/// This kind of event represents an unsaved event, meaning that it has less information
/// than a `RecordedEvent`. It's a generic form to simplify the event processing but also a way to
/// define `metadata`, `causation_id` and `correlation_id`
pub struct UnsavedEvent {
/// a `causation_id` defines who caused this event
causation_id: Option<Uuid>,
/// a `correlation_id` correlates multiple events
correlation_id: Option<Uuid>,
/// Human readable event type
event_type: String,
/// Payload of this event
data: String,
/// Metadata defined for this event
metadata: String,
}
#[derive(Debug)]
pub enum ParseEventError {
UnknownFailure,
}
impl From<serde_json::Error> for ParseEventError {
fn from(_: serde_json::Error) -> Self {
Self::UnknownFailure
}
}
impl UnsavedEvent {
pub(crate) fn try_from<E: Event>(event: &E) -> Result<Self, ParseEventError> {
Ok(Self {
causation_id: None,
correlation_id: None,
event_type: event.event_type().to_owned(),
data: serde_json::to_string(&event)?,
metadata: String::new(),
})
}
}
And adding some tests to validate that this simple implementation works:
use crate::event::{unsaved::UnsavedEvent, Event};
use super::*;
#[derive(Serialize)]
pub struct MyStructEvent {}
#[derive(Serialize)]
pub enum MyEnumEvent {
Created { id: i32 },
Updated(String),
Deleted,
}
impl Event for MyEnumEvent {
fn event_type(&self) -> &'static str {
match *self {
Self::Deleted => "MyEnumEvent::Deleted",
Self::Created { .. } => "MyEnumEvent::Created",
Self::Updated(_) => "MyEnumEvent::Updated",
}
}
}
impl Event for MyStructEvent {
fn event_type(&self) -> &'static str {
"MyStructEvent"
}
}
mod unsaved {
use super::*;
#[test]
fn must_have_a_valide_event_type() {
let source_events = vec![
MyEnumEvent::Created { id: 1 },
MyEnumEvent::Updated("Updated".into()),
MyEnumEvent::Deleted,
];
let mut produces_events: Vec<UnsavedEvent> = source_events
.iter()
.map(UnsavedEvent::try_from)
.filter(Result::is_ok)
.map(Result::unwrap)
.collect();
let next = MyStructEvent {};
produces_events.push(UnsavedEvent::try_from(&next).unwrap());
let mut expected = vec![
"MyEnumEvent::Created",
"MyEnumEvent::Updated",
"MyEnumEvent::Deleted",
"MyStructEvent",
];
expected
.into_iter()
.zip(produces_events)
.for_each(|(ex, real)| assert_eq!(ex, real.event_type));
}
}
running 7 tests
test event::test::unsaved::must_have_a_valide_event_type ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok
test result: ok. 7 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
As we can see the From
implementation isn’t really useful because we can’t specify causation_id
, correlation_id
nor metadata
. We will switch to a Builder
in the next iteration.
The RecordedEvent
will have much more information than UnsavedEvent
. In our pattern, Event
can’t be saved without a Stream
and there is no point to retrieve an Event
outside a Stream
. The EventStore
will expose methods to interact with Stream
which will return RecordedEvent
. Our RecordedEvent
which can be defined as StreamEvent
will have every needed piece of information related to that Stream
version of the inserted Event
.
Let’s talk a little more about what we defined.
- A
RecordedEvent
is the aggregation of information of bothStreamEvent
andEvent
- A
RecordedEvent
must have anevent_number
related to thestream_version
- A
RecordedEvent
must have a uniqueevent_id
- A
RecordedEvent
must have astream_uuid
- A
RecordedEvent
must have astream_version
use uuid::Uuid;
/// A `RecordedEvent` represents an `Event` which have been appended to a `Stream`
pub struct RecordedEvent {
/// an incrementing and gapless integer used to order the event in a stream.
event_number: i32,
/// Unique identifier for this event
event_uuid: Uuid,
/// The stream identifier for this event
stream_uuid: Uuid,
/// The stream version when this event was appended
stream_version: i32,
/// a `causation_id` defines who caused this event
causation_id: Option<Uuid>,
/// a `correlation_id` correlates multiple events
correlation_id: Option<Uuid>,
/// Human readable event type
event_type: String,
/// Payload of this event
data: String,
/// Metadata defined for this event
metadata: String,
/// Event time creation
created_at: String,
}
Now that we have a Storage
and some Event
s, let’s talk about the next big subject.
Design: The Stream
A Stream
can be seen as a bunch of events representing a concept. An aggregate state is a Stream
for example. But the Stream
concept can go way further, instead of just having aggregates related Stream
we can define much more useful things like feature related Stream
(e.g: Having a Stream
containing all AccountCreated
event, another one containing all users related events, etc).
But what’s a Stream
if we put aside events. A Stream
holds information like his current version
, a stream_uuid
which will be further discussed later and of course a created_at
and deleted_at
. The link between Stream
and Event
is StreamEvent
which can be transformed to RecordedEvent
by the EventStore
.
A Stream
must have
- a
stream_uuid
which is unique and identifies the event - a
version
which defines the current version of this stream
A StreamEvent
must have
- a
event_uuid
which is equal to theEvent
’sevent_uuid
. - a
stream_uuid
which is equal to theStream
’sstream_uuid
. - a
stream_version
which defines the version of the stream for this event.
Let’s define our Stream
struct then.
/// A `Stream` represents an `Event` stream
#[derive(Debug, PartialEq)]
pub struct Stream {
/// The stream identifier which is unique
stream_uuid: String,
/// The current stream version number
stream_version: i32,
/// The creation date of the stream
created_at: String,
/// The deletion date of the stream
deleted_at: String,
}
impl Stream {
pub fn stream_uuid(&self) -> &str {
self.stream_uuid.as_ref()
}
}
#[derive(Debug, PartialEq)]
pub enum StreamError {
MalformedStreamUUID,
}
impl std::str::FromStr for Stream {
type Err = StreamError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains(' ') {
return Err(StreamError::MalformedStreamUUID);
}
Ok(Self {
stream_uuid: s.into(),
stream_version: 0,
created_at: String::new(),
deleted_at: String::new(),
})
}
}
As you can see I moved the MalformedStreamUUID
into the Stream
creation. There are some more optimisations to do with both Stream
and Storage
but it’s a first version and we will iterate. Let’s see what InMemoryBackend
looks like.
use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
let stream = Stream::from_str(stream_uuid)?;
if self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams.insert(stream_uuid.to_owned(), stream);
Ok(self.streams.get(stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
let stream = Stream::from_str(stream_uuid)?;
if !self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream_uuid);
Ok(())
}
}
Something isn’t really nice here, we’re building a new Stream
object inside each method to validate the stream_uuid
. What if we provide a Stream
object instead of a &str
, we could avoid some repetitive code, let’s do that!
use crate::stream::{Stream, StreamError};
/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
/// Create a new stream with an identifier
///
/// # Errors
/// The stream creation can fail for multiple reasons:
///
/// - pure storage failure (unable to create the stream on the backend)
/// - The stream already exists
fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError>;
/// Delete a stream from the `Backend`
///
/// Do we need to provide a hard/soft deletion?
///
/// # Errors
/// The stream deletion can fail for multiple reasons:
///
/// - pure storage failure (unable to delete the stream on the backend)
/// - The stream doesn't exists
fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError>;
}
mod inmemory;
#[cfg(test)]
mod test;
#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
AlreadyExists,
StreamError(StreamError),
StorageError(StorageError),
}
impl std::convert::From<StreamError> for StreamCreationError {
fn from(e: StreamError) -> Self {
Self::StreamError(e)
}
}
#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
DoesntExists,
StreamError(StreamError),
StorageError(StorageError),
}
impl std::convert::From<StreamError> for StreamDeletionError {
fn from(e: StreamError) -> Self {
Self::StreamError(e)
}
}
#[derive(Debug, PartialEq)]
pub enum StorageError {
Unknown,
}
We also update our InMemoryBackend
code to match the trait modifications.
use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError> {
let stream_uuid = stream.stream_uuid().to_owned();
if self.streams.contains_key(&stream_uuid) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams.insert(stream_uuid.to_owned(), stream);
Ok(self.streams.get(&stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError> {
if !self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream.stream_uuid());
Ok(())
}
}
Our tests on the storage are simplified, we removed the MalformedStreamUUID
tests because it’s handled by the Stream
now.
use uuid::Uuid;
use crate::storage::{
inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};
use crate::stream::{Stream, StreamError};
use std::str::FromStr;
mod creation {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
}
#[test]
fn fail_if_stream_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
assert_eq!(
storage.create_stream(Stream::from_str(&uuid).unwrap()),
Err(StreamCreationError::AlreadyExists)
);
}
}
mod deletion {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
assert!(storage
.delete_stream(&Stream::from_str(&uuid).unwrap())
.is_ok());
}
#[test]
fn fail_if_stream_doesnt_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert_eq!(
storage.delete_stream(&Stream::from_str(&uuid).unwrap()),
Err(StreamDeletionError::DoesntExists)
);
}
}
Next
In the next blog post we will create some restrictions around the append
method to prevent unwanted behaviours. In the meantime I will add documentation and other explanation about the project in the repository.
You can see all the work done in this blog post here.
-
Serde
is providingtype
value forEnum
which represents the variant. We could maybe use this kinds of tricks to extract the variant from the serialized event. But it’s too early to define that and there are some problems around this. TheSerialize
part of theDomainEvent
is defined on the client side not in theevent_store
so we don’t have any control over what’s done. If the end user uses#[serde(untagged)]
on hisDomainEvent
, well, we’re fucked. Feel free to tell me what you’re thinking about this use case in comments! ↩︎