Building Chekov - Part 1: Design the EventStore

Jun 17, 2020 • 15 minutes to read
This article is part of a larger set : Building Chekov

Chekov

CQRS/ES have been trending topics for some times now and there are some pretty nice libraries out there to build application following these patterns. But I found myself missing some of those tools to build Rust applications for a while so I decided to try to fulfill this gap by implementing it myself.

In this blog series I will cover my way of building an OpenSource framework called Chekov. I will try to describe how I design and implement the whole thing. I will start from scratch, I will fail sometimes, but I will learn a lot and you too. I will try to cover as much part of the reflexion I have, feel free to ask for more details on Twitter or at the end of each blog post.

But you may have a question:

What the f*ck is Chekov ?

Chekov is a Rust framework to help people building CQRS/ES application. It aims to provide everything you need to dispatch a command, produce events and build projections.

Ok

there are more complex and powerful features in the pipe but we will talk about it later in the process!

For now we will focus on having something that can work for the most simple project, we will iterate together on the features along the way! (Feel free to open an issue on the repository to provide feedbacks or feature suggestions!)

CQRS (Command Query Responsibility Segregation) is a pattern that separates Commands (Writes) from Queries (Reads). What does it mean?

It provides a pattern that separates the application layer into a command stack and a query stack.

  • Commands are operations that change the application state and doesn’t return data.
  • Queries are operations that return data without changing the application state.

Here’s a representation of how it works:

CQRS pattern exemple © Martin Fowler - https://martinfowler.com

EventSourcing is a pattern that persists the state of a business entity as a sequence of events. When the state of a business entity changes, a new event is appended to the entity’s events list. An entity’s state is an aggregation of all the entity’s events, meaning that you can rebuild the entity’s state by consuming the events from the origin if you want to.


Where to start with this?

First I had a to figure out how to split this large project in little pieces to be able to iterate.
I already know a little about CQRS/ES because I used this pattern a bunch of times for both personal and professional applications. I contribute to several discussions and OpenSource projects like Commanded (which is a really nice Elixir lib and really inspiring for Chekov).
But I wanted to be clear on what I will be doing for the next iteration. Starting with coding can be a problem but I want to be able to produce step by step milestones and iterate over time to improve the whole thing. That being said, I will certainly build something and do some refactoring on it in the next iteration but it’s ok!

I want chekov to be modular and easy to use, meaning that the first thing to do is to split the EventSourcing from the CQRS and so on. At this time I will only define three modules in the framework:

  • event_store is responsible for appending events to the store and reading them on demand
  • command is responsible for handling the input command and executing them on handlers
  • chekov which is the glue between the others

It seems simple but each one of them have their own domain.

Do we want the event_store to be responsible to execute commands? I don’t think so.
Do we want the command module to know how events are saved to the store? I don’t think either
Do we want the command module to know how the events are serialized? No
Do we want the event_store to know that this stream is an aggregate stream? Naahhh

As you can see there are plenty of questions to ask and to answer on this subject. Some of you may know a lot more about that than me so feel free to contribute!

I will start by defining and designing the basic elements of the event_store which seems to be the base of all the framework.

The event store

Let’s define some key principles for our event store. (this specs can evolve over the implementation time, if you want an up to date documentation go check the repository!)

Define the concept

Our event_store will deal with Event and Stream which will be store in a Storage. The event_store can use different kinds of Storage but he will expose the same public API allowing users to interact with multiple Storage of the same or different types.

An Event is a semantic log that describes something that has happened in the past, some call them Domain Events. Meaning that the event_store will not modify the event itself in any way. An Event is immutable and can’t be rollbacked or modified. As you can see I don’t talk about how events are generated because it’s out of scope. The event_store don’t generate Event, it will expose a public type that defines what an Event should look like but nothing more.

A Stream is a suite of events that can only grow in size. A Stream doesn’t own the Event but a link is created between Stream and Event. An Event can be displayed in multiple Stream but an Event is useless if it is not link to a Stream. Meaning that you can’t publish an event without linking it to a Stream, if it wasn’t linked then no one would be able to read it.

A StreamEvent is the link between an Event and a Stream.

A Storage is where the Events and Streams are stored.

Now that we have a little idea of the pieces, let’s design the first one which is the Storage.

Note that everything defines here will be persisted in some way, but it’s too early to define how it will be persisted.

Design: The storage

The Storage is the element which is responsible for storing the actual Events and for managing Streams creation and modifications. We can use Storage as an abstraction to a Backend service which can be a database or anything else. Also we want to be able to define different kinds of Storage for our event_store, every Storage will have the same public API but will use different implementation under the hood.

Let’s define what a Storage looks like and what we want to provide as public API.

A Storage must have

  • an API to manage Streams
  • an API to append and read Events

Because we want the Storage to be implemented by multiple Backend we will use a trait.

/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
    /// Create a new stream with an identifier
    ///
    /// # Errors
    /// The stream creation can fail for multiple reasons:
    ///
    /// - pure storage failure (unable to create the stream on the backend)
    /// - The stream already exists
    /// - The `stream_uuid` is invalid
    fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError>;

    /// Delete a stream from the `Backend`
    ///
    /// Do we need to provide a hard/soft deletion?
    ///
    /// # Errors
    /// The stream deletion can fail for multiple reasons:
    ///
    /// - pure storage failure (unable to delete the stream on the backend)
    /// - The stream doesn't exists
    /// - The `stream_uuid` is invalid
    fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError>;
}

mod inmemory;

#[cfg(test)]
mod test;

/// Types that will be implemented later
#[derive(Debug, PartialEq)]
pub struct Stream {
    deleted: bool,
}

#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
    AlreadyExists,
    MalformedStreamUUID,
    StorageError(StorageError),
}

#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
    DoesntExists,
    MalformedStreamUUID,
    StorageError(StorageError),
}

#[derive(Debug, PartialEq)]
pub enum StorageError {
    Unknown,
}

Now that we have a Storage trait we need to implement it in our first Backend which will be an InMemory Backend.

use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;

#[derive(Default)]
pub struct InMemoryBackend {
    streams: HashMap<String, Stream>,
}

impl Storage for InMemoryBackend {
    fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
        unimplemented!()
    }

    fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
        Ok(())
    }
}

Our Backend implementation looks like this for now. Let’s add some tests:

use uuid::Uuid;

use crate::storage::{
    inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};

mod creation {
    use super::*;

    #[test]
    fn success() {
        let mut storage = InMemoryBackend::default();
        let uuid = Uuid::new_v4().to_string();

        assert!(storage.create_stream(&uuid).is_ok());
    }

    #[test]
    fn fail_if_stream_exists() {
        let mut storage = InMemoryBackend::default();

        let uuid = Uuid::new_v4().to_string();

        assert!(storage.create_stream(&uuid).is_ok());
        assert_eq!(
            storage.create_stream(&uuid),
            Err(StreamCreationError::AlreadyExists)
        );
    }

    #[test]
    fn fail_if_stream_uuid_malformed() {
        let mut storage = InMemoryBackend::default();

        assert_eq!(
            storage.create_stream("an uuid"),
            Err(StreamCreationError::MalformedStreamUUID)
        );
    }
}

mod deletion {
    use super::*;

    #[test]
    fn success() {
        let mut storage = InMemoryBackend::default();
        let uuid = Uuid::new_v4().to_string();

        assert!(storage.create_stream(&uuid).is_ok());
        assert!(storage.delete_stream(&uuid).is_ok());
    }

    #[test]
    fn fail_if_stream_doesnt_exists() {
        let mut storage = InMemoryBackend::default();

        let uuid = Uuid::new_v4().to_string();

        assert_eq!(
            storage.delete_stream(&uuid),
            Err(StreamDeletionError::DoesntExists)
        );
    }

    #[test]
    fn fail_if_stream_uuid_malformed() {
        let mut storage = InMemoryBackend::default();

        assert_eq!(
            storage.delete_stream("an uuid"),
            Err(StreamDeletionError::MalformedStreamUUID)
        );
    }
}

running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... FAILED
test storage::test::creation::fail_if_stream_exists ... FAILED
test storage::test::creation::success ... FAILED
test storage::test::deletion::fail_if_stream_doesnt_exists ... FAILED
test storage::test::deletion::fail_if_stream_uuid_malformed ... FAILED
test storage::test::deletion::success ... FAILED

failures:

---- storage::test::creation::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::creation::fail_if_stream_uuid_malformed' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- storage::test::creation::fail_if_stream_exists stdout ----
thread 'storage::test::creation::fail_if_stream_exists' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9

---- storage::test::creation::success stdout ----
thread 'storage::test::creation::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9

---- storage::test::deletion::fail_if_stream_doesnt_exists stdout ----
thread 'storage::test::deletion::fail_if_stream_doesnt_exists' panicked at 'assertion failed: `(left == right)`
  left: `Ok(())`,
 right: `Err(DoesntExists)`', event_store/src/storage/test.rs:60:9

---- storage::test::deletion::fail_if_stream_uuid_malformed stdout ----
thread 'storage::test::deletion::fail_if_stream_uuid_malformed' panicked at 'assertion failed: `(left == right)`
  left: `Ok(())`,
 right: `Err(MalformedStreamUUID)`', event_store/src/storage/test.rs:70:9

---- storage::test::deletion::success stdout ----
thread 'storage::test::deletion::success' panicked at 'not implemented', event_store/src/storage/inmemory.rs:11:9


failures:
    storage::test::creation::fail_if_stream_exists
    storage::test::creation::fail_if_stream_uuid_malformed
    storage::test::creation::success
    storage::test::deletion::fail_if_stream_doesnt_exists
    storage::test::deletion::fail_if_stream_uuid_malformed
    storage::test::deletion::success

test result: FAILED. 0 passed; 6 failed; 0 ignored; 0 measured; 0 filtered out

Let’s implement the inner create_stream and delete_stream methods then.

use crate::storage::{Storage, Stream, StreamCreationError, StreamDeletionError};
use std::collections::HashMap;

#[derive(Default)]
pub struct InMemoryBackend {
    streams: HashMap<String, Stream>,
}

impl Storage for InMemoryBackend {
    fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
        // Move this in a generic function later
        // stream_uuid must be validated way before this point
        if stream_uuid.contains(' ') {
            return Err(StreamCreationError::MalformedStreamUUID);
        }

        if self.streams.contains_key(stream_uuid) {
            return Err(StreamCreationError::AlreadyExists);
        }

        self.streams
            .insert(stream_uuid.to_owned(), Stream { deleted: false });

        Ok(self.streams.get(stream_uuid).unwrap())
    }

    fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
        // Move this in a generic function later
        // stream_uuid must be validated way before this point
        if stream_uuid.contains(' ') {
            return Err(StreamDeletionError::MalformedStreamUUID);
        }

        if !self.streams.contains_key(stream_uuid) {
            return Err(StreamDeletionError::DoesntExists);
        }

        self.streams.remove(stream_uuid);

        Ok(())
    }
}

running 6 tests
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok

test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out


running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Design: The Event

Now that we are able to create and delete Stream we need to be able to create Event. We will talk about different kinds of events so let’s define some key principles to make it cristal clear.

  • An Event defined outside of the event_store crate is a DomainEvent which represents an actual domain event (i.e.: MoneyDeposited, MoneyWithdrawn).
  • An DomainEvent can be transformed into an UnsavedEvent which is part of the event_store crate. An UnsavedEvent doesn’t hold the DomainEvent type to be able to publish multiple kinds of events at the same time.
  • A RecordedEvent is defined and generated inside the event_store crate and can be transformed into a DomainEvent by the end user.

Here’s a little graph to explain those interactions:

sequenceDiagram Client-->>EventStore: append DomainEvent EventStore-->>EventStore: Convert to UnsavedEvent EventStore-->>Storage: append UnsavedEvent Note over Client,Storage: Events have been successfully appended Client-->>EventStore: read stream EventStore-->>Storage: fetch events EventStore-->>Client: List of RecordedEvents Client-->>Client: Convert to List of DomainEvents

An Event must have

  • an event_uuid which is unique and identify the event
  • a correlation_id which correlates multiple events
  • a causation_id which defines who caused this event
  • a type which is human readable
  • a data attribute which is defined by the library user
  • a metadata attribute which is defined by the library user or by the event_store itself
  • a created_at value that defines when the event has been created

So let’s define our Event trait and the UnsavedEvent type. As we defines earlier an Event is what the event_store expects as input. It needs to match some behaviour to be considered a valid event in our predefined model. An UnsavedEvent must have a type, a data attribute containing the DomainEvent serialized, some metadatas and two optional correlation_id and causation_id.

Because we need to be able to generate an UnsavedEvent from a type that implement Event we need to provide a try_from implementation to handle this. The Event trait must provide information that we can’t infer (like the event_type), we could have use the generic std::any::type_name method but it wouldn’t be helpful for Enum because we are missing the variant part (we could solve that with serde1). We will define an event_type method returning this information.

The data of the Event must be automatically serializable using serde, I’m thinking of using serde_json serialization to store the event. But for now we will just tell that Event needs to implement serde::Serialize.

use serde::Serialize;

/// Represent event that can be handled by an `EventStore`
pub trait Event: Serialize {
    /// Return a static str which define the event type
    ///
    /// This str must be as precise as possible.
    fn event_type(&self) -> &'static str;
}

mod unsaved;
mod recorded;

We can now define our UnsavedEvent struct:

use super::Event;
use serde_json;
use uuid::Uuid;

/// An `UnsavedEvent` is created from a type that implement Event
///
/// This kind of event represents an unsaved event, meaning that it has less information
/// than a `RecordedEvent`. It's a generic form to simplify the event processing but also a way to
/// define `metadata`, `causation_id` and `correlation_id`
pub struct UnsavedEvent {
    /// a `causation_id` defines who caused this event
    causation_id: Option<Uuid>,
    /// a `correlation_id` correlates multiple events
    correlation_id: Option<Uuid>,
    /// Human readable event type
    event_type: String,
    /// Payload of this event
    data: String,
    /// Metadata defined for this event
    metadata: String,
}

#[derive(Debug)]
pub enum ParseEventError {
    UnknownFailure,
}

impl From<serde_json::Error> for ParseEventError {
    fn from(_: serde_json::Error) -> Self {
        Self::UnknownFailure
    }
}

impl UnsavedEvent {
    pub(crate) fn try_from<E: Event>(event: &E) -> Result<Self, ParseEventError> {
        Ok(Self {
            causation_id: None,
            correlation_id: None,
            event_type: event.event_type().to_owned(),
            data: serde_json::to_string(&event)?,
            metadata: String::new(),
        })
    }
}

And adding some tests to validate that this simple implementation works:

use crate::event::{unsaved::UnsavedEvent, Event};

use super::*;

#[derive(Serialize)]
pub struct MyStructEvent {}

#[derive(Serialize)]
pub enum MyEnumEvent {
    Created { id: i32 },
    Updated(String),
    Deleted,
}

impl Event for MyEnumEvent {
    fn event_type(&self) -> &'static str {
        match *self {
            Self::Deleted => "MyEnumEvent::Deleted",
            Self::Created { .. } => "MyEnumEvent::Created",
            Self::Updated(_) => "MyEnumEvent::Updated",
        }
    }
}

impl Event for MyStructEvent {
    fn event_type(&self) -> &'static str {
        "MyStructEvent"
    }
}

mod unsaved {
    use super::*;

    #[test]
    fn must_have_a_valide_event_type() {
        let source_events = vec![
            MyEnumEvent::Created { id: 1 },
            MyEnumEvent::Updated("Updated".into()),
            MyEnumEvent::Deleted,
        ];

        let mut produces_events: Vec<UnsavedEvent> = source_events
            .iter()
            .map(UnsavedEvent::try_from)
            .filter(Result::is_ok)
            .map(Result::unwrap)
            .collect();

        let next = MyStructEvent {};

        produces_events.push(UnsavedEvent::try_from(&next).unwrap());

        let mut expected = vec![
            "MyEnumEvent::Created",
            "MyEnumEvent::Updated",
            "MyEnumEvent::Deleted",
            "MyStructEvent",
        ];
        expected
            .into_iter()
            .zip(produces_events)
            .for_each(|(ex, real)| assert_eq!(ex, real.event_type));
    }
}

running 7 tests
test event::test::unsaved::must_have_a_valide_event_type ... ok
test storage::test::creation::fail_if_stream_exists ... ok
test storage::test::creation::fail_if_stream_uuid_malformed ... ok
test storage::test::creation::success ... ok
test storage::test::deletion::fail_if_stream_doesnt_exists ... ok
test storage::test::deletion::fail_if_stream_uuid_malformed ... ok
test storage::test::deletion::success ... ok

test result: ok. 7 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out


running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

As we can see the From implementation isn’t really useful because we can’t specify causation_id, correlation_id nor metadata. We will switch to a Builder in the next iteration.

The RecordedEvent will have much more information than UnsavedEvent. In our pattern, Event can’t be saved without a Stream and there is no point to retrieve an Event outside a Stream. The EventStore will expose methods to interact with Stream which will return RecordedEvent. Our RecordedEvent which can be defined as StreamEvent will have every needed piece of information related to that Stream version of the inserted Event.

Let’s talk a little more about what we defined.

  • A RecordedEvent is the aggregation of information of both StreamEvent and Event
  • A RecordedEvent must have an event_number related to the stream_version
  • A RecordedEvent must have a unique event_id
  • A RecordedEvent must have a stream_uuid
  • A RecordedEvent must have a stream_version
use uuid::Uuid;

/// A `RecordedEvent` represents an `Event` which have been appended to a `Stream`
pub struct RecordedEvent {
    /// an incrementing and gapless integer used to order the event in a stream.
    event_number: i32,
    /// Unique identifier for this event
    event_uuid: Uuid,
    /// The stream identifier for this event
    stream_uuid: Uuid,
    /// The stream version when this event was appended
    stream_version: i32,
    /// a `causation_id` defines who caused this event
    causation_id: Option<Uuid>,
    /// a `correlation_id` correlates multiple events
    correlation_id: Option<Uuid>,
    /// Human readable event type
    event_type: String,
    /// Payload of this event
    data: String,
    /// Metadata defined for this event
    metadata: String,
    /// Event time creation
    created_at: String,
}

Now that we have a Storage and some Events, let’s talk about the next big subject.

Design: The Stream

A Stream can be seen as a bunch of events representing a concept. An aggregate state is a Stream for example. But the Stream concept can go way further, instead of just having aggregates related Stream we can define much more useful things like feature related Stream (e.g: Having a Stream containing all AccountCreated event, another one containing all users related events, etc).

But what’s a Stream if we put aside events. A Stream holds information like his current version, a stream_uuid which will be further discussed later and of course a created_at and deleted_at. The link between Stream and Event is StreamEvent which can be transformed to RecordedEvent by the EventStore.

A Stream must have

  • a stream_uuid which is unique and identifies the event
  • a version which defines the current version of this stream

A StreamEvent must have

  • a event_uuid which is equal to the Event's event_uuid.
  • a stream_uuid which is equal to the Stream's stream_uuid.
  • a stream_version which defines the version of the stream for this event.

Let’s define our Stream struct then.

/// A `Stream` represents an `Event` stream
#[derive(Debug, PartialEq)]
pub struct Stream {
    /// The stream identifier which is unique
    stream_uuid: String,
    /// The current stream version number
    stream_version: i32,
    /// The creation date of the stream
    created_at: String,
    /// The deletion date of the stream
    deleted_at: String,
}

impl Stream {
    pub fn stream_uuid(&self) -> &str {
        self.stream_uuid.as_ref()
    }
}
#[derive(Debug, PartialEq)]
pub enum StreamError {
    MalformedStreamUUID,
}

impl std::str::FromStr for Stream {
    type Err = StreamError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        if s.contains(' ') {
            return Err(StreamError::MalformedStreamUUID);
        }

        Ok(Self {
            stream_uuid: s.into(),
            stream_version: 0,
            created_at: String::new(),
            deleted_at: String::new(),
        })
    }
}

As you can see I moved the MalformedStreamUUID into the Stream creation. There are some more optimisations to do with both Stream and Storage but it’s a first version and we will iterate. Let’s see what InMemoryBackend looks like.

use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;

#[derive(Default)]
pub struct InMemoryBackend {
    streams: HashMap<String, Stream>,
}

impl Storage for InMemoryBackend {
    fn create_stream(&mut self, stream_uuid: &str) -> Result<&Stream, StreamCreationError> {
        let stream = Stream::from_str(stream_uuid)?;

        if self.streams.contains_key(stream.stream_uuid()) {
            return Err(StreamCreationError::AlreadyExists);
        }

        self.streams.insert(stream_uuid.to_owned(), stream);

        Ok(self.streams.get(stream_uuid).unwrap())
    }

    fn delete_stream(&mut self, stream_uuid: &str) -> Result<(), StreamDeletionError> {
        let stream = Stream::from_str(stream_uuid)?;

        if !self.streams.contains_key(stream.stream_uuid()) {
            return Err(StreamDeletionError::DoesntExists);
        }

        self.streams.remove(stream_uuid);

        Ok(())
    }
}

Something isn’t really nice here, we’re building a new Stream object inside each method to validate the stream_uuid. What if we provide a Stream object instead of a &str, we could avoid some repetitive code, let’s do that!

use crate::stream::{Stream, StreamError};

/// A `Storage` is responsible for storing and managing `Stream` and `Event`for a `Backend`
pub trait Storage {
    /// Create a new stream with an identifier
    ///
    /// # Errors
    /// The stream creation can fail for multiple reasons:
    ///
    /// - pure storage failure (unable to create the stream on the backend)
    /// - The stream already exists
    fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError>;

    /// Delete a stream from the `Backend`
    ///
    /// Do we need to provide a hard/soft deletion?
    ///
    /// # Errors
    /// The stream deletion can fail for multiple reasons:
    ///
    /// - pure storage failure (unable to delete the stream on the backend)
    /// - The stream doesn't exists
    fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError>;
}

mod inmemory;

#[cfg(test)]
mod test;

#[derive(Debug, PartialEq)]
pub enum StreamCreationError {
    AlreadyExists,
    StreamError(StreamError),
    StorageError(StorageError),
}

impl std::convert::From<StreamError> for StreamCreationError {
    fn from(e: StreamError) -> Self {
        Self::StreamError(e)
    }
}

#[derive(Debug, PartialEq)]
pub enum StreamDeletionError {
    DoesntExists,
    StreamError(StreamError),
    StorageError(StorageError),
}

impl std::convert::From<StreamError> for StreamDeletionError {
    fn from(e: StreamError) -> Self {
        Self::StreamError(e)
    }
}

#[derive(Debug, PartialEq)]
pub enum StorageError {
    Unknown,
}

We also update our InMemoryBackend code to match the trait modifications.

use crate::storage::{Storage, StreamCreationError, StreamDeletionError};
use crate::stream::Stream;
use std::collections::HashMap;
use std::str::FromStr;

#[derive(Default)]
pub struct InMemoryBackend {
    streams: HashMap<String, Stream>,
}

impl Storage for InMemoryBackend {
    fn create_stream(&mut self, stream: Stream) -> Result<&Stream, StreamCreationError> {
        let stream_uuid = stream.stream_uuid().to_owned();

        if self.streams.contains_key(&stream_uuid) {
            return Err(StreamCreationError::AlreadyExists);
        }

        self.streams.insert(stream_uuid.to_owned(), stream);

        Ok(self.streams.get(&stream_uuid).unwrap())
    }

    fn delete_stream(&mut self, stream: &Stream) -> Result<(), StreamDeletionError> {
        if !self.streams.contains_key(stream.stream_uuid()) {
            return Err(StreamDeletionError::DoesntExists);
        }

        self.streams.remove(stream.stream_uuid());

        Ok(())
    }
}

Our tests on the storage are simplified, we removed the MalformedStreamUUID tests because it’s handled by the Stream now.

use uuid::Uuid;

use crate::storage::{
    inmemory::InMemoryBackend, Storage, StreamCreationError, StreamDeletionError,
};
use crate::stream::{Stream, StreamError};

use std::str::FromStr;

mod creation {
    use super::*;

    #[test]
    fn success() {
        let mut storage = InMemoryBackend::default();
        let uuid = Uuid::new_v4().to_string();

        assert!(storage
            .create_stream(Stream::from_str(&uuid).unwrap())
            .is_ok());
    }

    #[test]
    fn fail_if_stream_exists() {
        let mut storage = InMemoryBackend::default();

        let uuid = Uuid::new_v4().to_string();

        assert!(storage
            .create_stream(Stream::from_str(&uuid).unwrap())
            .is_ok());
        assert_eq!(
            storage.create_stream(Stream::from_str(&uuid).unwrap()),
            Err(StreamCreationError::AlreadyExists)
        );
    }
}

mod deletion {
    use super::*;

    #[test]
    fn success() {
        let mut storage = InMemoryBackend::default();
        let uuid = Uuid::new_v4().to_string();

        assert!(storage
            .create_stream(Stream::from_str(&uuid).unwrap())
            .is_ok());
        assert!(storage
            .delete_stream(&Stream::from_str(&uuid).unwrap())
            .is_ok());
    }

    #[test]
    fn fail_if_stream_doesnt_exists() {
        let mut storage = InMemoryBackend::default();

        let uuid = Uuid::new_v4().to_string();

        assert_eq!(
            storage.delete_stream(&Stream::from_str(&uuid).unwrap()),
            Err(StreamDeletionError::DoesntExists)
        );
    }
}

Next

In the next blog post we will create some restrictions around the append method to prevent unwanted behaviours. In the meantime I will add documentation and other explanation about the project in the repository.

You can see all the work done in this blog post here.



  1. Serde is providing type value for Enum which represents the variant. We could maybe use this kinds of tricks to extract the variant from the serialized event. But it’s too early to define that and there are some problems around this. The Serialize part of the DomainEvent is defined on the client side not in the event_store so we don’t have any control over what’s done. If the end user uses #[serde(untagged)] on his DomainEvent, well, we’re fucked. Feel free to tell me what you’re thinking about this use case in comments! ↩︎

Building Chekovrust

Why clippy doesnt report anything after update/cargo check and how to solve it

comments powered by Disqus