Table of contents
Chekov
CQRS/ES have been trending topics for some times now and there are some pretty nice libraries out there to build application following these patterns. But I found myself missing some of those tools to build Rust applications for a while so I decided to try to fulfill this gap by implementing it myself.
In this blog series I will cover my way of building an OpenSource framework called Chekov. I will try to describe how I design and implement the whole thing. I will start from scratch, I will fail sometimes, but I will learn a lot and you too. I will try to cover as much part of the reflexion I have, feel free to ask for more details on Twitter or at the end of each blog post.
But you may have a question:
What the f*ck is Chekov ?
Chekov is a Rust framework to help people building CQRS/ES application. It aims to provide everything you need to dispatch a command, produce events and build projections.
there are more complex and powerful features in the pipe but we will talk about it later in the process!
For now we will focus on having something that can work for the most simple project, we will iterate together on the features along the way! (Feel free to open an issue on the repository to provide feedbacks or feature suggestions!)
CQRS (Command Query Responsibility Segregation) is a pattern that separates Commands (Writes) from Queries (Reads). What does it mean?
It provides a pattern that separates the application layer into a command stack and a query stack.
- Commands are operations that change the application state and doesn’t return data.
- Queries are operations that return data without changing the application state.
Here’s a representation of how it works:
CQRS pattern exemple © Martin Fowler - https://martinfowler.com
EventSourcing is a pattern that persists the state of a business entity as a sequence of events. When the state of a business entity changes, a new event is appended to the entity’s events list. An entity’s state is an aggregation of all the entity’s events, meaning that you can rebuild the entity’s state by consuming the events from the origin if you want to.
Where to start with this?
First I had a to figure out how to split this large project in little pieces to be able to iterate.
I already know a little about CQRS/ES because I used this pattern a bunch of times for both personal and professional applications. I contribute to several discussions and OpenSource projects like Commanded (which is a really nice Elixir lib and really inspiring for Chekov).
But I wanted to be clear on what I will be doing for the next iteration. Starting with coding can be a problem but I want to be able to produce step by step milestones and iterate over time to improve the whole thing. That being said, I will certainly build something and do some refactoring on it in the next iteration but it’s ok!
I want chekov to be modular and easy to use, meaning that the first thing to do is to split the EventSourcing from the CQRS and so on. At this time I will only define three modules in the framework:
- event_store is responsible for appending events to the store and reading them on demand
- command is responsible for handling the input command and executing them on handlers
- chekov which is the glue between the others
It seems simple but each one of them have their own domain.
Do we want the command module to know how events are saved to the store? I don’t think either
Do we want the command module to know how the events are serialized? No
Do we want the event_store to know that this stream is an aggregate stream? Naahhh
As you can see there are plenty of questions to ask and to answer on this subject. Some of you may know a lot more about that than me so feel free to contribute!
I will start by defining and designing the basic elements of the event_store which seems to be the base of all the framework.
The event store
Let’s define some key principles for our event store. (this specs can evolve over the implementation time, if you want an up to date documentation go check the repository!)
Define the concept
Our event_store will deal with Event and Stream which will be store in a Storage. The event_store can use different kinds of Storage but he will expose the same public API allowing users to interact with multiple Storage of the same or different types.
An Event is a semantic log that describes something that has happened in the past, some call them Domain Events. Meaning that the event_store will not modify the event itself in any way. An Event is immutable and can’t be rollbacked or modified. As you can see I don’t talk about how events are generated because it’s out of scope. The event_store don’t generate Event, it will expose a public type that defines what an Event should look like but nothing more.
A Stream is a suite of events that can only grow in size. A Stream doesn’t own the Event but a link is created between Stream and Event. An Event can be displayed in multiple Stream but an Event is useless if it is not link to a Stream. Meaning that you can’t publish an event without linking it to a Stream, if it wasn’t linked then no one would be able to read it.
A StreamEvent is the link between an Event and a Stream.
A Storage is where the Events and Streams are stored.
Now that we have a little idea of the pieces, let’s design the first one which is the Storage.
Note that everything defines here will be persisted in some way, but it’s too early to define how it will be persisted.
Design: The storage
The Storage is the element which is responsible for storing the actual Events and for managing Streams creation and modifications.
We can use Storage as an abstraction to a Backend service which can be a database or anything else.
Also we want to be able to define different kinds of Storage for our event_store, every Storage will have the same public API but will use different implementation under the hood.
Let’s define what a Storage looks like and what we want to provide as public API.
A Storage must have
- an API to manage
Streams - an API to append and read
Events
Because we want the Storage to be implemented by multiple Backend we will
use a trait.
/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
/// Create a new stream with an identifier
///
/// # Errors
/// The stream creation can fail for multiple reasons:
///
/// - pure storage failure (unable to create the stream on the backend)
/// - The stream already exists
/// - The `stream_uuid` is invalid
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError>;
/// Delete a stream from the `Backend`
///
/// Do we need to provide a hard/soft deletion?
///
/// # Errors
/// The stream deletion can fail for multiple reasons:
///
/// - pure storage failure (unable to delete the stream on the backend)
/// - The stream doesn't exists
/// - The `stream_uuid` is invalid
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError>;
}
mod inmemory;
#[cfg(test)]
mod test;
/// Types that will be implemented later
#[derive(Debug, PartialEq)]
pub struct Stream {
deleted: bool,
}
#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
AlreadyExists,
MalformedStreamUUID,
StorageError(StorageError),
}
#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
DoesntExists,
MalformedStreamUUID,
StorageError(StorageError),
}
#[derive(Debug, PartialEq)]
pub enum StorageError {
Unknown,
}
Now that we have a Storage trait we need to implement it in our first
Backend which will be an InMemory Backend.
use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
unimplemented!()
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
Ok(())
}
}
Our Backend implementation looks like this for now. Let’s add some tests:
use uuid::Uuid;
use crate::storage::{
inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};
mod creation {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
}
#[test]
fn fail_if_stream_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
assert_eq!(
storage.create_stream(&uuid),
Err(StreamCreationError::AlreadyExists)
);
}
#[test]
fn fail_if_stream_uuid_malformed() {
let mut storage = InMemoryBackend::default();
assert_eq!(
storage.create_stream("an uuid"),
Err(StreamCreationError::MalformedStreamUUID)
);
}
}
mod deletion {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage.create_stream(&uuid).is_ok());
assert!(storage.delete_stream(&uuid).is_ok());
}
#[test]
fn fail_if_stream_doesnt_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert_eq!(
storage.delete_stream(&uuid),
Err(StreamDeletionError::DoesntExists)
);
}
#[test]
fn fail_if_stream_uuid_malformed() {
let mut storage = InMemoryBackend::default();
assert_eq!(
storage.delete_stream("an uuid"),
Err(StreamDeletionError::MalformedStreamUUID)
);
}
}
running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... FAILED
test storage::test::creation::fail_if_stream_exists ... FAILED
test storage::test::creation::success ... FAILED
test storage::test::deletion::fail_if_stream_doesnt_exists ... FAILED
test storage::test::deletion::fail_if_stream_uuid_malformed ... FAILED
test storage::test::deletion::success ... FAILED
failures:
---- storage::test::creation::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::creation::fail_if_stream_uuid_malformed' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
---- storage::test::creation::fail_if_stream_exists stdout ----
thread 'storage::test::creation::fail_if_stream_exists' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
---- storage::test::creation::success stdout ----
thread 'storage::test::creation::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
---- storage::test::deletion::fail_if_stream_doesnt_exists stdout ----
thread 'storage::test::deletion::fail_if_stream_doesnt_exists' panicked at 'assertion failed: `(left == right)`
left: `Ok(())`,
right: `Err(DoesntExists)`', event_store/src/storage/test.rs:60:9
---- storage::test::deletion::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::deletion::fail_if_stream_uuid_malformed' panicked at 'assertion failed: `(left == right)`
left: `Ok(())`,
right: `Err(MalformedStreamUUID)`', event_store/src/storage/test.rs:70:9
---- storage::test::deletion::success stdout ----
thread 'storage::test::deletion::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
failures:
storage::test::creation::fail_if_stream_exists
storage::test::creation::fail_if_stream_uuid_malformed
storage::test::creation::success
storage::test::deletion::fail_if_stream_doesnt_exists
storage::test::deletion::fail_if_stream_uuid_malformed
storage::test::deletion::success
test result: FAILED. 0 passed; 6 failed; 0 ignored; 0 measured; 0 filtered out
Let’s implement the inner create_stream and delete_stream methods then.
use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
// Move this in a generic function later
// stream_uuid must be validated way before this point
if stream_uuid.contains(' ') {
return Err(StreamCreationError::MalformedStreamUUID);
}
if self.streams.contains_key(stream_uuid) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams
.insert(stream_uuid.to_owned(), Stream { deleted: false });
Ok(self.streams.get(stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
// Move this in a generic function later
// stream_uuid must be validated way before this point
if stream_uuid.contains(' ') {
return Err(StreamDeletionError::MalformedStreamUUID);
}
if !self.streams.contains_key(stream_uuid) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream_uuid);
Ok(())
}
}
running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok
test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Design: The Event
Now that we are able to create and delete Stream we need to be able to create Event. We will talk about different kinds of events so let’s define some key principles to make it cristal clear.
- An
Eventdefined outside of theevent_storecrate is aDomainEventwhich represents an actual domain event (i.e.:MoneyDeposited,MoneyWithdrawn). - An
DomainEventcan be transformed into anUnsavedEventwhich is part of theevent_storecrate. AnUnsavedEventdoesn’t hold theDomainEventtype to be able to publish multiple kinds of events at the same time. - A
RecordedEventis defined and generated inside theevent_storecrate and can be transformed into aDomainEventby the end user.
Here’s a little graph to explain those interactions:
An Event must have
- an
event_uuidwhich is unique and identify the event - a
correlation_idwhich correlates multiple events - a
causation_idwhich defines who caused this event - a
typewhich is human readable - a
dataattribute which is defined by the library user - a
metadataattribute which is defined by the library user or by theevent_storeitself - a
created_atvalue that defines when the event has been created
So let’s define our Event trait and the UnsavedEvent type. As we defines earlier an Event is what the event_store expects as input. It needs to match some behaviour to be considered a valid event in our predefined model. An UnsavedEvent must have a type, a data attribute containing the DomainEvent serialized, some metadatas and two optional correlation_id and causation_id.
Because we need to be able to generate an UnsavedEvent from a type that implement Event we need to provide a try_from implementation to handle this. The Event trait must provide information that we can’t infer (like the event_type), we could have use the generic std::any::type_name method but it wouldn’t be helpful for Enum because we are missing the variant part (we could solve that with serde1). We will define an event_type method returning this information.
The data of the Event must be automatically serializable using serde, I’m thinking of using serde_json serialization to store the event. But for now we will just tell that Event needs to implement serde::Serialize.
use serde::Serialize;
/// Represent event that can be handled by an `EventStore`
pub trait Event: Serialize {
/// Return a static str which define the event type
///
/// This str must be as precise as possible.
fn event_type(&self) -> &'static str;
}
mod unsaved;
mod recorded;
We can now define our UnsavedEvent struct:
use super::Event;
use serde_json;
use uuid::Uuid;
/// An `UnsavedEvent` is created from a type that implement Event
///
/// This kind of event represents an unsaved event, meaning that it has less information
/// than a `RecordedEvent`. It's a generic form to simplify the event processing but also a way to
/// define `metadata`, `causation_id` and `correlation_id`
pub struct UnsavedEvent {
/// a `causation_id` defines who caused this event
causation_id: Option<Uuid>,
/// a `correlation_id` correlates multiple events
correlation_id: Option<Uuid>,
/// Human readable event type
event_type: String,
/// Payload of this event
data: String,
/// Metadata defined for this event
metadata: String,
}
#[derive(Debug)]
pub enum ParseEventError {
UnknownFailure,
}
impl From<serde_json::Error> for ParseEventError {
fn from(_: serde_json::Error) -> Self {
Self::UnknownFailure
}
}
impl UnsavedEvent {
pub(crate) fn try_from<E: Event>(event: &E) -> Result<Self, ParseEventError> {
Ok(Self {
causation_id: None,
correlation_id: None,
event_type: event.event_type().to_owned(),
data: serde_json::to_string(&event)?,
metadata: String::new(),
})
}
}
And adding some tests to validate that this simple implementation works:
use crate::event::{unsaved::UnsavedEvent, Event};
use super::*;
#[derive(Serialize)]
pub struct MyStructEvent {}
#[derive(Serialize)]
pub enum MyEnumEvent {
Created { id: i32 },
Updated(String),
Deleted,
}
impl Event for MyEnumEvent {
fn event_type(&self) -> &'static str {
match *self {
Self::Deleted => "MyEnumEvent::Deleted",
Self::Created { .. } => "MyEnumEvent::Created",
Self::Updated(_) => "MyEnumEvent::Updated",
}
}
}
impl Event for MyStructEvent {
fn event_type(&self) -> &'static str {
"MyStructEvent"
}
}
mod unsaved {
use super::*;
#[test]
fn must_have_a_valide_event_type() {
let source_events = vec![
MyEnumEvent::Created { id: 1 },
MyEnumEvent::Updated("Updated".into()),
MyEnumEvent::Deleted,
];
let mut produces_events: Vec<UnsavedEvent> = source_events
.iter()
.map(UnsavedEvent::try_from)
.filter(Result::is_ok)
.map(Result::unwrap)
.collect();
let next = MyStructEvent {};
produces_events.push(UnsavedEvent::try_from(&next).unwrap());
let mut expected = vec![
"MyEnumEvent::Created",
"MyEnumEvent::Updated",
"MyEnumEvent::Deleted",
"MyStructEvent",
];
expected
.into_iter()
.zip(produces_events)
.for_each(|(ex, real)| assert_eq!(ex, real.event_type));
}
}
running 7 tests
test event::test::unsaved::must_have_a_valide_event_type ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok
test result: ok. 7 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
As we can see the From implementation isn’t really useful because we can’t specify causation_id, correlation_id nor metadata. We will switch to a Builder in the next iteration.
The RecordedEvent will have much more information than UnsavedEvent. In our pattern, Event can’t be saved without a Stream and there is no point to retrieve an Event outside a Stream. The EventStore will expose methods to interact with Stream which will return RecordedEvent. Our RecordedEvent which can be defined as StreamEvent will have every needed piece of information related to that Stream version of the inserted Event.
Let’s talk a little more about what we defined.
- A
RecordedEventis the aggregation of information of bothStreamEventandEvent - A
RecordedEventmust have anevent_numberrelated to thestream_version - A
RecordedEventmust have a uniqueevent_id - A
RecordedEventmust have astream_uuid - A
RecordedEventmust have astream_version
use uuid::Uuid;
/// A `RecordedEvent` represents an `Event` which have been appended to a `Stream`
pub struct RecordedEvent {
/// an incrementing and gapless integer used to order the event in a stream.
event_number: i32,
/// Unique identifier for this event
event_uuid: Uuid,
/// The stream identifier for this event
stream_uuid: Uuid,
/// The stream version when this event was appended
stream_version: i32,
/// a `causation_id` defines who caused this event
causation_id: Option<Uuid>,
/// a `correlation_id` correlates multiple events
correlation_id: Option<Uuid>,
/// Human readable event type
event_type: String,
/// Payload of this event
data: String,
/// Metadata defined for this event
metadata: String,
/// Event time creation
created_at: String,
}
Now that we have a Storage and some Events, let’s talk about the next big subject.
Design: The Stream
A Stream can be seen as a bunch of events representing a concept. An aggregate state is a Stream for example. But the Stream concept can go way further, instead of just having aggregates related Stream we can define much more useful things like feature related Stream (e.g: Having a Stream containing all AccountCreated event, another one containing all users related events, etc).
But what’s a Stream if we put aside events. A Stream holds information like his current version, a stream_uuid which will be further discussed later and of course a created_at and deleted_at. The link between Stream and Event is StreamEvent which can be transformed to RecordedEvent by the EventStore.
A Stream must have
- a
stream_uuidwhich is unique and identifies the event - a
versionwhich defines the current version of this stream
A StreamEvent must have
- a
event_uuidwhich is equal to theEvent’sevent_uuid. - a
stream_uuidwhich is equal to theStream’sstream_uuid. - a
stream_versionwhich defines the version of the stream for this event.
Let’s define our Stream struct then.
/// A `Stream` represents an `Event` stream
#[derive(Debug, PartialEq)]
pub struct Stream {
/// The stream identifier which is unique
stream_uuid: String,
/// The current stream version number
stream_version: i32,
/// The creation date of the stream
created_at: String,
/// The deletion date of the stream
deleted_at: String,
}
impl Stream {
pub fn stream_uuid(&self) -> &str {
self.stream_uuid.as_ref()
}
}
#[derive(Debug, PartialEq)]
pub enum StreamError {
MalformedStreamUUID,
}
impl std::str::FromStr for Stream {
type Err = StreamError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.contains(' ') {
return Err(StreamError::MalformedStreamUUID);
}
Ok(Self {
stream_uuid: s.into(),
stream_version: 0,
created_at: String::new(),
deleted_at: String::new(),
})
}
}
As you can see I moved the MalformedStreamUUID into the Stream creation. There are some more optimisations to do with both Stream and Storage but it’s a first version and we will iterate. Let’s see what InMemoryBackend looks like.
use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
let stream = Stream::from_str(stream_uuid)?;
if self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams.insert(stream_uuid.to_owned(), stream);
Ok(self.streams.get(stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
let stream = Stream::from_str(stream_uuid)?;
if !self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream_uuid);
Ok(())
}
}
Something isn’t really nice here, we’re building a new Stream object inside each method to validate the stream_uuid. What if we provide a Stream object instead of a &str, we could avoid some repetitive code, let’s do that!
use crate::stream::{Stream, StreamError};
/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
/// Create a new stream with an identifier
///
/// # Errors
/// The stream creation can fail for multiple reasons:
///
/// - pure storage failure (unable to create the stream on the backend)
/// - The stream already exists
fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError>;
/// Delete a stream from the `Backend`
///
/// Do we need to provide a hard/soft deletion?
///
/// # Errors
/// The stream deletion can fail for multiple reasons:
///
/// - pure storage failure (unable to delete the stream on the backend)
/// - The stream doesn't exists
fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError>;
}
mod inmemory;
#[cfg(test)]
mod test;
#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
AlreadyExists,
StreamError(StreamError),
StorageError(StorageError),
}
impl std::convert::From<StreamError> for StreamCreationError {
fn from(e: StreamError) -> Self {
Self::StreamError(e)
}
}
#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
DoesntExists,
StreamError(StreamError),
StorageError(StorageError),
}
impl std::convert::From<StreamError> for StreamDeletionError {
fn from(e: StreamError) -> Self {
Self::StreamError(e)
}
}
#[derive(Debug, PartialEq)]
pub enum StorageError {
Unknown,
}
We also update our InMemoryBackend code to match the trait modifications.
use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Default)]
pub struct InMemoryBackend {
streams: HashMap<String, Stream>,
}
impl Storage for InMemoryBackend {
fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError> {
let stream_uuid = stream.stream_uuid().to_owned();
if self.streams.contains_key(&stream_uuid) {
return Err(StreamCreationError::AlreadyExists);
}
self.streams.insert(stream_uuid.to_owned(), stream);
Ok(self.streams.get(&stream_uuid).unwrap())
}
fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError> {
if !self.streams.contains_key(stream.stream_uuid()) {
return Err(StreamDeletionError::DoesntExists);
}
self.streams.remove(stream.stream_uuid());
Ok(())
}
}
Our tests on the storage are simplified, we removed the MalformedStreamUUID tests because it’s handled by the Stream now.
use uuid::Uuid;
use crate::storage::{
inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};
use crate::stream::{Stream, StreamError};
use std::str::FromStr;
mod creation {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
}
#[test]
fn fail_if_stream_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
assert_eq!(
storage.create_stream(Stream::from_str(&uuid).unwrap()),
Err(StreamCreationError::AlreadyExists)
);
}
}
mod deletion {
use super::*;
#[test]
fn success() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert!(storage
.create_stream(Stream::from_str(&uuid).unwrap())
.is_ok());
assert!(storage
.delete_stream(&Stream::from_str(&uuid).unwrap())
.is_ok());
}
#[test]
fn fail_if_stream_doesnt_exists() {
let mut storage = InMemoryBackend::default();
let uuid = Uuid::new_v4().to_string();
assert_eq!(
storage.delete_stream(&Stream::from_str(&uuid).unwrap()),
Err(StreamDeletionError::DoesntExists)
);
}
}
Next
In the next blog post we will create some restrictions around the append method to prevent unwanted behaviours. In the meantime I will add documentation and other explanation about the project in the repository.
You can see all the work done in this blog post here.
-
Serdeis providingtypevalue forEnumwhich represents the variant. We could maybe use this kinds of tricks to extract the variant from the serialized event. But it’s too early to define that and there are some problems around this. TheSerializepart of theDomainEventis defined on the client side not in theevent_storeso we don’t have any control over what’s done. If the end user uses#[serde(untagged)]on hisDomainEvent, well, we’re fucked. Feel free to tell me what you’re thinking about this use case in comments! ↩︎


