diff --git a/.gitignore b/.gitignore index 088ba6b..4135d20 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk + +.env + diff --git a/Cargo.toml b/Cargo.toml index 05e95ea..66ef288 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,27 @@ [package] name = "epoch" -version = "0.1.0" +version = "1.0.0-alpha.16" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["in_memory", "esdb"] +in_memory = [] +esdb = ["dep:eventstore", "dep:uuid", "dep:serde_json"] + [dependencies] async-trait = "0.1.53" -chrono = "0.4.19" -eventstore = "2.1.1" +eventstore = { version = "2.2.0", optional = true } serde = { version = "1.0.136", features = ["derive"] } -serde_json = "1.0.81" +serde_json = { version = "1.0.81", optional = true } thiserror = "1.0" -# Use 0.8.2 only here because of a weird deps bug with eventstore -# 1.x uuids are not compatible with eventstore internal uuids -uuid = { version = "0.8.2", features = ["v4", "serde"] } +uuid = { package = "uuid", version = "1.2.2", features = ["v4", "serde"], optional = true } + +[dev-dependencies] +actix-rt = "2.7.0" +assert_matches = "1.5.0" +const-random = "0.1.15" +autoincrement = "1" +dotenv = "0.15.0" +futures = "0.3.25" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..aef9042 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,39 @@ +version: "3.9" +services: + redis: + image: "redislabs/rejson:latest" + restart: always + ports: + - '6379:6379' + command: redis-server --save 20 1 --loglevel warning --loadmodule /usr/lib/redis/modules/rejson.so + volumes: + - redis:/data + eventstore.db: + image: eventstore/eventstore:20.10.2-buster-slim + environment: + - EVENTSTORE_CLUSTER_SIZE=1 + - EVENTSTORE_RUN_PROJECTIONS=All + - EVENTSTORE_START_STANDARD_PROJECTIONS=true + - EVENTSTORE_EXT_TCP_PORT=1113 + - EVENTSTORE_HTTP_PORT=2113 + - EVENTSTORE_INSECURE=true + - EVENTSTORE_ENABLE_EXTERNAL_TCP=true + - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true + ports: + - "1113:1113" + - "2113:2113" + volumes: + - type: volume + source: eventstore-volume-data + target: /var/lib/eventstore + - type: volume + source: eventstore-volume-logs + target: /var/log/eventstore + +volumes: + redis: + driver: local + eventstore-volume-data: + driver: local + eventstore-volume-logs: + driver: local diff --git a/epoch/Cargo.toml b/epoch/Cargo.toml deleted file mode 100644 index 05e95ea..0000000 --- a/epoch/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "epoch" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1.53" -chrono = "0.4.19" -eventstore = "2.1.1" -serde = { version = "1.0.136", features = ["derive"] } -serde_json = "1.0.81" -thiserror = "1.0" -# Use 0.8.2 only here because of a weird deps bug with eventstore -# 1.x uuids are not compatible with eventstore internal uuids -uuid = { version = "0.8.2", features = ["v4", "serde"] } diff --git a/epoch/src/event_store.rs b/epoch/src/event_store.rs deleted file mode 100644 index c6e7765..0000000 --- a/epoch/src/event_store.rs +++ /dev/null @@ -1,230 +0,0 @@ -use std::{ - fmt::{Debug, Display}, - str::FromStr, -}; - -use chrono::{DateTime, Utc}; -use eventstore::{ - AppendToStreamOptions, EventData, ExpectedRevision, ReadStreamOptions, ResolvedEvent, -}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -use async_trait::async_trait; -use uuid::Uuid; - -use crate::Event; - -use super::{ - CommmandInvariantError, EventContext, EventEnvelope, EventStore, FromCommandInvariant, - PreparedEvent, ToCommandInvariantError, -}; - -#[derive(Clone)] -pub struct ESDBEventStore { - is_test: bool, - client: eventstore::Client, -} - -#[derive(Error, Debug)] -pub enum Error { - #[error("ESDB ERROR {0}")] - ESDBGeneral(eventstore::Error), - #[error("Error reading stream: {0}")] - ReadStream(eventstore::Error), - #[error("Could not deserialize event {0}")] - DeserializeEvent(serde_json::Error), - #[error("Could not parse event meta {0}")] - ParseMetadata(serde_json::Error), - #[error("Could not serialize event {0}")] - SerializeEventDataPayload(serde_json::Error), - #[error("Could not write to stream {0}: {1}")] - WriteStream(String, eventstore::Error), - #[error("Could not convert EventContext::Id to String {0}")] - ContextIdFromStr(String), - #[error("Command invariant bounds prevented a committed event {0}")] - CommandInvariant(String), -} - -impl FromCommandInvariant for Error { - fn from_command_invariant(cmd_err: Ctx::Err) -> Self - where - Ctx: EventContext, - ::Err: ToCommandInvariantError, - { - // Self::CommandInvariant(cmd_err.to_command_invariant_error()) - Self::CommandInvariant(cmd_err.to_string()) - } -} - -impl ESDBEventStore { - pub fn new(client: eventstore::Client) -> ESDBEventStore { - Self { - client, - is_test: false, - } - } - - // #[cfg(test)] - pub fn new_for_test(client: eventstore::Client) -> ESDBEventStore { - Self { - client, - is_test: true, - } - } - - pub fn stream_id(&self, id: Option) -> String - where - ::Id: Display, - { - let prefix = if self.is_test { "test_" } else { "" }; - - let ctx = Ctx::event_context(); - - match id { - Some(id_str) => format!("{}/{}", ctx, id_str), - None => format!("{}{}", prefix, ctx), - } - } - - fn resolved_to_event_envelope( - resolved_event: &ResolvedEvent, - ) -> Result<(EventEnvelope, ExpectedRevision), Error> - where - Ctx: EventContext, - ::Id: FromStr, - { - let event_data = resolved_event.get_original_event(); - let event = event_data - .as_json::<::Event>() - .map_err(Error::DeserializeEvent)?; - - let EventMetadata { - time, - event_context, - event_context_id, - }: EventMetadata = - serde_json::from_slice(&event_data.custom_metadata).map_err(Error::ParseMetadata)?; - - let event_context_id = match event_context_id { - Some(str) => { - Some(Ctx::Id::from_str(&str.clone()).map_err(|_| Error::ContextIdFromStr(str))?) - } - None => None, - }; - - Ok(( - EventEnvelope { - id: event_data.id, - time, - event_context, - event_context_id, - data: event, - }, - ExpectedRevision::Exact(event_data.revision), - )) - } -} - -#[async_trait] -impl EventStore for ESDBEventStore { - type Error = Error; - type Position = ExpectedRevision; - - async fn load( - &self, - id: Option, - ) -> Result<(Vec>, Self::Position), Self::Error> - where - ::Id: Display + FromStr, - { - let mut stream = self - .client - .read_stream(self.stream_id::(id), &ReadStreamOptions::default()) - .await - .map_err(Self::Error::ESDBGeneral)?; - - let mut evts: Vec = vec![]; - loop { - match stream.next().await { - Ok(Some(event)) => evts.push(event), - Ok(None) => break, - Err(eventstore::Error::ResourceNotFound) => { - return Ok((vec![], ExpectedRevision::NoStream)) - } - Err(e) => return Err(Self::Error::ReadStream(e)), - } - } - - let mut rv = vec![]; - let mut pos = ExpectedRevision::StreamExists; - - for ev in evts { - let (ee, revision) = ESDBEventStore::resolved_to_event_envelope(&ev)?; - - rv.push(ee); - pos = revision; - } - - Ok((rv, pos)) - } - - async fn append( - &self, - position: Self::Position, - event: PreparedEvent, - id: Option<::Id>, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Id: Clone + Display + ToString, - ::Event: Serialize + Event, - { - let event_meta = EventMetadata { - time: Utc::now(), - event_context: event.event_context, - event_context_id: id.clone().map(|x| x.to_string()), - }; - - let event_id = Uuid::new_v4(); - let event_data = EventData::json(event.data.event_type(), &event.data.clone()) - .map_err(Error::SerializeEventDataPayload)? - .id(event_id.clone()) - .metadata_as_json(event_meta.clone()) - .map_err(Error::SerializeEventDataPayload)?; - - let options = AppendToStreamOptions::default().expected_revision(position); - - let stream_id = self.stream_id::(id.clone()); - - let res = self - .client - .append_to_stream(stream_id.clone(), &options, event_data.clone()) - .await - .map_err(|e| Error::WriteStream(stream_id, e))?; - - let EventMetadata { - time, - event_context, - .. - } = event_meta; - - Ok(( - EventEnvelope { - id: event_id, - time, - event_context, - event_context_id: id, - data: event.data, - }, - ExpectedRevision::Exact(res.next_expected_version), - )) - } -} - -#[derive(Deserialize, Serialize, Clone)] -pub struct EventMetadata { - time: DateTime, - event_context: String, - event_context_id: Option, -} diff --git a/epoch/src/lib.rs b/epoch/src/lib.rs deleted file mode 100644 index 9ea9c6f..0000000 --- a/epoch/src/lib.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::{ - fmt::{Debug, Display}, - str::FromStr, -}; - -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use serde::{de::DeserializeOwned, Serialize}; -use thiserror::Error; -use uuid::Uuid; - -pub mod event_store; - -#[derive(Debug, Clone)] -pub struct EventEnvelope -where - Ctx: EventContext + ?Sized, - ::Id: FromStr + Clone, - ::Event: Event + Clone, -{ - pub id: Uuid, - pub event_context: String, - pub event_context_id: Option, - pub time: DateTime, - pub data: Ctx::Event, -} - -#[derive(Debug, Clone)] -pub struct PreparedEvent -where - Ctx: EventContext + ?Sized, -{ - pub event_context: String, - pub event_context_id: Option, - pub data: Ctx::Event, -} - -pub trait Event { - fn event_type(&self) -> String; -} - -pub trait FromCommandInvariant { - fn from_command_invariant(cmd_err: Ctx::Err) -> Self - where - Ctx: EventContext, - ::Err: ToCommandInvariantError; -} - -#[derive(Error, Debug)] -pub enum CommmandInvariantError { - #[error("Command Invariant: {0}")] - CommandInvariant(String), - // #[error("User Command Invariant Command {0}")] - // Users(UserError) -} - -pub trait ToCommandInvariantError: Display { - fn to_command_invariant_error(&self) -> CommmandInvariantError; -} - -#[async_trait] -pub trait EventContext { - type Id: ToString + FromStr + Send + Sync + Eq + PartialEq + Clone; - type Command; - type Event: Send + Sync + DeserializeOwned + Event + Clone; - type Err: Debug; - type Services; - type State: Default; - - fn event_context() -> String; - - fn to_prepared_event(id: Option, event: Self::Event) -> PreparedEvent { - PreparedEvent { - event_context: Self::event_context(), - event_context_id: id, - data: event, - } - } - - async fn handle( - state: Self::State, - cmd: Self::Command, - services: Self::Services, - ) -> Result, Self::Err>; - - fn apply(state: Self::State, event: &EventEnvelope) -> Self::State; -} - -#[async_trait] -pub trait EventStore { - type Error; - type Position: Eq + PartialEq + Clone + Send; - - async fn load( - &self, - id: Option, - ) -> Result<(Vec>, Self::Position), Self::Error> - where - ::Id: Display + FromStr; - - async fn append( - &self, - position: Self::Position, - event: PreparedEvent, - id: Option, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Id: Clone + Display, - ::Event: Serialize + Event; - - async fn execute( - &self, - cmd: Ctx::Command, - services: Ctx::Services, - id: Option<::Id>, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync + Debug, - ::Id: Display + Clone, - ::Event: Send + Clone + Serialize + Debug, - ::Services: Send, - ::Command: Send, - ::State: Send, - ::Err: Send + Sync + Clone + ToCommandInvariantError, - Self::Error: FromCommandInvariant, - { - let (value, position) = self.get_current_state::(id.clone()).await?; - let res = Ctx::handle(value, cmd, services) - .await - .map_err(Self::Error::from_command_invariant::)?; - - Ok(self.append::(position, res, id).await?) - } - - async fn get_current_state( - &self, - id: Option, - ) -> Result<(Ctx::State, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Event: Send + Clone + Debug, - ::Id: Display + Clone, - { - let (evts, position) = self.load::(id).await?; - - Ok(( - evts.iter() - .fold(Ctx::State::default(), |state, evt| Ctx::apply(state, evt)), - position, - )) - } -} diff --git a/src/decider.rs b/src/decider.rs new file mode 100644 index 0000000..03aca7d --- /dev/null +++ b/src/decider.rs @@ -0,0 +1,100 @@ +use std::fmt::Debug; + +struct Placeholder { + value: String +} + +impl Placeholder { + async fn foo(bar: String) -> String { + bar + } +} + +pub trait Event { + type EntityId; + + fn event_type(&self) -> String; + fn get_id(&self) -> Self::EntityId; +} + +pub trait Decider: Evolver { + type Cmd: Send + Sync; + type Err; + + fn decide(state: &Self::State, cmd: &Self::Cmd) -> Result, Self::Err>; +} + +pub trait DeciderWithContext: Evolver + Debug { + type Ctx: std::fmt::Debug; + type Cmd: Send + Sync + std::fmt::Debug; + type Err: std::fmt::Debug; + + fn decide( + ctx: &Self::Ctx, + state: &Self::State, + cmd: &Self::Cmd, + ) -> Result, Self::Err>; +} + +pub trait Evolver { + type State: Debug; + type Evt: Event + Debug; + fn evolve(state: Self::State, event: &Self::Evt) -> Self::State; +} + +#[cfg(test)] +mod tests { + + use assert_matches::assert_matches; + + use crate::{ + repository::{ + event::EventRepository, + in_memory::simple::{InMemoryEventRepository, InMemoryStateRepository}, + state::StateRepository, + }, + test_helpers::{ + deciders::user::{self, UserCommand, UserDecider, UserDeciderState, UserEvent}, + ValueType, + }, + }; + + use super::*; + + #[actix_rt::test] + async fn test_raw_decider() { + let event_repository: InMemoryEventRepository = InMemoryEventRepository::new(); + let mut state_repository: InMemoryStateRepository = + InMemoryStateRepository::new(); + + let state = event_repository + .load() + .await + .expect("Empty Events Vector") + .iter() + .fold(UserDeciderState::default(), UserDecider::evolve); + + let cmd = UserCommand::AddUser("Mike".to_string() as user::UnvalidatedUserName); + let events = ::decide(&state, &cmd).expect("Decider Success"); + + if let Some(UserEvent::UserAdded(user::User { name, id, .. })) = events.clone().first() { + let user_id = *id; + let user_name = name.clone(); + + assert_eq!(name.value(), "Mike".to_string()); + + let state = events.iter().fold(state.clone(), UserDecider::evolve); + + let _ = state_repository.save(&state).await; + assert_eq!(state_repository.reify().await.unwrap(), state.clone()); + + assert_matches!(state.users.get(&user_id).expect("User exists"), user::User { + id, + name, + .. + } if (id == &user_id && name == &user_name)); + } else { + panic!("Events not produced") + } + } +} diff --git a/src/event_store.rs b/src/event_store.rs deleted file mode 100644 index c6e7765..0000000 --- a/src/event_store.rs +++ /dev/null @@ -1,230 +0,0 @@ -use std::{ - fmt::{Debug, Display}, - str::FromStr, -}; - -use chrono::{DateTime, Utc}; -use eventstore::{ - AppendToStreamOptions, EventData, ExpectedRevision, ReadStreamOptions, ResolvedEvent, -}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -use async_trait::async_trait; -use uuid::Uuid; - -use crate::Event; - -use super::{ - CommmandInvariantError, EventContext, EventEnvelope, EventStore, FromCommandInvariant, - PreparedEvent, ToCommandInvariantError, -}; - -#[derive(Clone)] -pub struct ESDBEventStore { - is_test: bool, - client: eventstore::Client, -} - -#[derive(Error, Debug)] -pub enum Error { - #[error("ESDB ERROR {0}")] - ESDBGeneral(eventstore::Error), - #[error("Error reading stream: {0}")] - ReadStream(eventstore::Error), - #[error("Could not deserialize event {0}")] - DeserializeEvent(serde_json::Error), - #[error("Could not parse event meta {0}")] - ParseMetadata(serde_json::Error), - #[error("Could not serialize event {0}")] - SerializeEventDataPayload(serde_json::Error), - #[error("Could not write to stream {0}: {1}")] - WriteStream(String, eventstore::Error), - #[error("Could not convert EventContext::Id to String {0}")] - ContextIdFromStr(String), - #[error("Command invariant bounds prevented a committed event {0}")] - CommandInvariant(String), -} - -impl FromCommandInvariant for Error { - fn from_command_invariant(cmd_err: Ctx::Err) -> Self - where - Ctx: EventContext, - ::Err: ToCommandInvariantError, - { - // Self::CommandInvariant(cmd_err.to_command_invariant_error()) - Self::CommandInvariant(cmd_err.to_string()) - } -} - -impl ESDBEventStore { - pub fn new(client: eventstore::Client) -> ESDBEventStore { - Self { - client, - is_test: false, - } - } - - // #[cfg(test)] - pub fn new_for_test(client: eventstore::Client) -> ESDBEventStore { - Self { - client, - is_test: true, - } - } - - pub fn stream_id(&self, id: Option) -> String - where - ::Id: Display, - { - let prefix = if self.is_test { "test_" } else { "" }; - - let ctx = Ctx::event_context(); - - match id { - Some(id_str) => format!("{}/{}", ctx, id_str), - None => format!("{}{}", prefix, ctx), - } - } - - fn resolved_to_event_envelope( - resolved_event: &ResolvedEvent, - ) -> Result<(EventEnvelope, ExpectedRevision), Error> - where - Ctx: EventContext, - ::Id: FromStr, - { - let event_data = resolved_event.get_original_event(); - let event = event_data - .as_json::<::Event>() - .map_err(Error::DeserializeEvent)?; - - let EventMetadata { - time, - event_context, - event_context_id, - }: EventMetadata = - serde_json::from_slice(&event_data.custom_metadata).map_err(Error::ParseMetadata)?; - - let event_context_id = match event_context_id { - Some(str) => { - Some(Ctx::Id::from_str(&str.clone()).map_err(|_| Error::ContextIdFromStr(str))?) - } - None => None, - }; - - Ok(( - EventEnvelope { - id: event_data.id, - time, - event_context, - event_context_id, - data: event, - }, - ExpectedRevision::Exact(event_data.revision), - )) - } -} - -#[async_trait] -impl EventStore for ESDBEventStore { - type Error = Error; - type Position = ExpectedRevision; - - async fn load( - &self, - id: Option, - ) -> Result<(Vec>, Self::Position), Self::Error> - where - ::Id: Display + FromStr, - { - let mut stream = self - .client - .read_stream(self.stream_id::(id), &ReadStreamOptions::default()) - .await - .map_err(Self::Error::ESDBGeneral)?; - - let mut evts: Vec = vec![]; - loop { - match stream.next().await { - Ok(Some(event)) => evts.push(event), - Ok(None) => break, - Err(eventstore::Error::ResourceNotFound) => { - return Ok((vec![], ExpectedRevision::NoStream)) - } - Err(e) => return Err(Self::Error::ReadStream(e)), - } - } - - let mut rv = vec![]; - let mut pos = ExpectedRevision::StreamExists; - - for ev in evts { - let (ee, revision) = ESDBEventStore::resolved_to_event_envelope(&ev)?; - - rv.push(ee); - pos = revision; - } - - Ok((rv, pos)) - } - - async fn append( - &self, - position: Self::Position, - event: PreparedEvent, - id: Option<::Id>, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Id: Clone + Display + ToString, - ::Event: Serialize + Event, - { - let event_meta = EventMetadata { - time: Utc::now(), - event_context: event.event_context, - event_context_id: id.clone().map(|x| x.to_string()), - }; - - let event_id = Uuid::new_v4(); - let event_data = EventData::json(event.data.event_type(), &event.data.clone()) - .map_err(Error::SerializeEventDataPayload)? - .id(event_id.clone()) - .metadata_as_json(event_meta.clone()) - .map_err(Error::SerializeEventDataPayload)?; - - let options = AppendToStreamOptions::default().expected_revision(position); - - let stream_id = self.stream_id::(id.clone()); - - let res = self - .client - .append_to_stream(stream_id.clone(), &options, event_data.clone()) - .await - .map_err(|e| Error::WriteStream(stream_id, e))?; - - let EventMetadata { - time, - event_context, - .. - } = event_meta; - - Ok(( - EventEnvelope { - id: event_id, - time, - event_context, - event_context_id: id, - data: event.data, - }, - ExpectedRevision::Exact(res.next_expected_version), - )) - } -} - -#[derive(Deserialize, Serialize, Clone)] -pub struct EventMetadata { - time: DateTime, - event_context: String, - event_context_id: Option, -} diff --git a/src/lib.rs b/src/lib.rs index 9ea9c6f..6f34de6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,153 +1,6 @@ -use std::{ - fmt::{Debug, Display}, - str::FromStr, -}; +pub mod decider; +pub mod repository; +pub mod strategies; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use serde::{de::DeserializeOwned, Serialize}; -use thiserror::Error; -use uuid::Uuid; - -pub mod event_store; - -#[derive(Debug, Clone)] -pub struct EventEnvelope -where - Ctx: EventContext + ?Sized, - ::Id: FromStr + Clone, - ::Event: Event + Clone, -{ - pub id: Uuid, - pub event_context: String, - pub event_context_id: Option, - pub time: DateTime, - pub data: Ctx::Event, -} - -#[derive(Debug, Clone)] -pub struct PreparedEvent -where - Ctx: EventContext + ?Sized, -{ - pub event_context: String, - pub event_context_id: Option, - pub data: Ctx::Event, -} - -pub trait Event { - fn event_type(&self) -> String; -} - -pub trait FromCommandInvariant { - fn from_command_invariant(cmd_err: Ctx::Err) -> Self - where - Ctx: EventContext, - ::Err: ToCommandInvariantError; -} - -#[derive(Error, Debug)] -pub enum CommmandInvariantError { - #[error("Command Invariant: {0}")] - CommandInvariant(String), - // #[error("User Command Invariant Command {0}")] - // Users(UserError) -} - -pub trait ToCommandInvariantError: Display { - fn to_command_invariant_error(&self) -> CommmandInvariantError; -} - -#[async_trait] -pub trait EventContext { - type Id: ToString + FromStr + Send + Sync + Eq + PartialEq + Clone; - type Command; - type Event: Send + Sync + DeserializeOwned + Event + Clone; - type Err: Debug; - type Services; - type State: Default; - - fn event_context() -> String; - - fn to_prepared_event(id: Option, event: Self::Event) -> PreparedEvent { - PreparedEvent { - event_context: Self::event_context(), - event_context_id: id, - data: event, - } - } - - async fn handle( - state: Self::State, - cmd: Self::Command, - services: Self::Services, - ) -> Result, Self::Err>; - - fn apply(state: Self::State, event: &EventEnvelope) -> Self::State; -} - -#[async_trait] -pub trait EventStore { - type Error; - type Position: Eq + PartialEq + Clone + Send; - - async fn load( - &self, - id: Option, - ) -> Result<(Vec>, Self::Position), Self::Error> - where - ::Id: Display + FromStr; - - async fn append( - &self, - position: Self::Position, - event: PreparedEvent, - id: Option, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Id: Clone + Display, - ::Event: Serialize + Event; - - async fn execute( - &self, - cmd: Ctx::Command, - services: Ctx::Services, - id: Option<::Id>, - ) -> Result<(EventEnvelope, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync + Debug, - ::Id: Display + Clone, - ::Event: Send + Clone + Serialize + Debug, - ::Services: Send, - ::Command: Send, - ::State: Send, - ::Err: Send + Sync + Clone + ToCommandInvariantError, - Self::Error: FromCommandInvariant, - { - let (value, position) = self.get_current_state::(id.clone()).await?; - let res = Ctx::handle(value, cmd, services) - .await - .map_err(Self::Error::from_command_invariant::)?; - - Ok(self.append::(position, res, id).await?) - } - - async fn get_current_state( - &self, - id: Option, - ) -> Result<(Ctx::State, Self::Position), Self::Error> - where - Ctx: EventContext + Send + Sync, - ::Event: Send + Clone + Debug, - ::Id: Display + Clone, - { - let (evts, position) = self.load::(id).await?; - - Ok(( - evts.iter() - .fold(Ctx::State::default(), |state, evt| Ctx::apply(state, evt)), - position, - )) - } -} +#[cfg(test)] +mod test_helpers; diff --git a/src/repository/esdb/error.rs b/src/repository/esdb/error.rs new file mode 100644 index 0000000..c49f468 --- /dev/null +++ b/src/repository/esdb/error.rs @@ -0,0 +1,15 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("ESDB Error {0}")] + ESDBGeneral(eventstore::Error), + #[error("Error reading stream: {0}")] + ReadStream(eventstore::Error), + #[error("Could not deserialize event {0}")] + DeserializeEvent(serde_json::Error), + #[error("Could not serialize event {0}")] + SerializeEventDataPayload(serde_json::Error), + #[error("Could not write to stream {0}: {1}")] + WriteStream(String, eventstore::Error), +} diff --git a/src/repository/esdb/mod.rs b/src/repository/esdb/mod.rs new file mode 100644 index 0000000..2bc3182 --- /dev/null +++ b/src/repository/esdb/mod.rs @@ -0,0 +1,368 @@ +use std::{fmt::Debug, marker::PhantomData}; + +use async_trait::async_trait; +use eventstore::{ + AppendToStreamOptions, Client, CurrentRevision, EventData, ExpectedRevision, ReadStreamOptions, + ResolvedEvent, StreamPosition, +}; +use serde::{de::DeserializeOwned, Serialize}; +use uuid::Uuid; + +use crate::decider::Event; + +use self::error::Error; + +use super::{ + event::{VersionDiff, VersionedEventRepositoryWithStreams, VersionedRepositoryError}, + RepositoryVersion, +}; + +pub mod error; + +#[derive(Clone)] +pub struct ESDBEventRepository { + client: Client, + stream_name: String, + _hidden: PhantomData, +} + +impl ESDBEventRepository { + pub fn new(client: &Client, stream_name: &str) -> Self { + Self { + client: client.to_owned(), + stream_name: stream_name.to_owned(), + _hidden: PhantomData::default(), + } + } + + fn get_stream(&self, stream_id: Option<&String>) -> String { + if let Some(id) = stream_id { + format!("{}-{}", self.stream_name, id) + } else { + format!("$ce-{}", self.stream_name) + } + } + + fn version_to_esdb_position(version: &RepositoryVersion) -> StreamPosition { + if let RepositoryVersion::Exact(u) = version { + StreamPosition::Position(u.to_owned().try_into().unwrap()) + } else { + StreamPosition::Start + } + } + + fn version_to_expected_revision(version: &RepositoryVersion) -> ExpectedRevision { + match version { + RepositoryVersion::Exact(u) => { + ExpectedRevision::Exact(u.to_owned().try_into().unwrap()) + } + _ => ExpectedRevision::Any, + } + } + + fn current_revision_to_version(revision: &CurrentRevision) -> RepositoryVersion { + match revision { + CurrentRevision::Current(val) => RepositoryVersion::Exact(*val as usize), + CurrentRevision::NoStream => RepositoryVersion::NoStream, + } + } +} + +#[async_trait(?Send)] +impl<'a, E> VersionedEventRepositoryWithStreams<'a, E, Error> for ESDBEventRepository +where + E: Event + Sync + Send + Serialize + DeserializeOwned + Clone + Debug, +{ + type StreamId = String; + + async fn load( + &self, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> + where + E: Send + { + self.load_from_version(&RepositoryVersion::Any, id).await + } + + async fn load_from_version( + &self, + version: &RepositoryVersion, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> + where + E: Send + { + let mut stream = self + .client + .read_stream( + self.get_stream(id), + &ReadStreamOptions::default() + .resolve_link_tos() + .position(Self::version_to_esdb_position(version)), + ) + .await + .map_err(Error::ESDBGeneral) + .map_err(VersionedRepositoryError::RepoErr)?; + + let mut evts: Vec = vec![]; + + loop { + match stream.next().await { + Ok(Some(event)) => evts.push(event), + Ok(None) => break, + Err(eventstore::Error::ResourceNotFound) => { + return Ok((vec![], RepositoryVersion::NoStream)) + } + Err(e) => return Err(VersionedRepositoryError::RepoErr(Error::ReadStream(e))), + } + } + + let mut rv = vec![]; + let mut pos = RepositoryVersion::StreamExists; + + for ev in evts { + pos = RepositoryVersion::Exact(ev.get_original_event().revision.try_into().unwrap()); + + if let Some(event_data) = ev.event { + // Continue on deser failure - occasionally you'll get delete and other system types in the stream + if let Ok(event) = event_data.as_json::().map_err(Error::DeserializeEvent) { + rv.push(event); + } + } + } + + Ok((rv, pos)) + } + + async fn append( + &mut self, + version: &RepositoryVersion, + stream: &Self::StreamId, + events: &Vec, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> + where + 'a: 'async_trait, + E: 'async_trait + Send + Sync, + { + let mut perpared_events = vec![]; + + for e in events { + let ed = EventData::json(e.event_type(), e) + .map(|ed| ed.id(Uuid::new_v4())) + .map_err(Error::SerializeEventDataPayload) + .map_err(VersionedRepositoryError::RepoErr)?; + + perpared_events.push(ed); + } + + let res = self + .client + .append_to_stream( + self.get_stream(Some(stream)), + &AppendToStreamOptions::default() + .expected_revision(Self::version_to_expected_revision(version)), + perpared_events, + ) + .await + .map_err(|e| { + if let eventstore::Error::WrongExpectedVersion { current, .. } = e { + VersionedRepositoryError::VersionConflict(VersionDiff::new( + *version, + Self::current_revision_to_version(¤t), + )) + } else { + VersionedRepositoryError::RepoErr(Error::WriteStream(stream.to_owned(), e)) + } + })?; + + Ok(( + events.to_owned(), + RepositoryVersion::Exact(res.next_expected_version.try_into().unwrap()), + )) + } +} + +#[cfg(test)] +mod tests { + use const_random::const_random; + use core::time; + use futures::{ + future::{self, BoxFuture}, + FutureExt, + }; + use std::{ + collections::{HashMap, HashSet}, + thread, + }; + + use assert_matches::assert_matches; + use eventstore::DeleteStreamOptions; + + use crate::strategies::{LoadDecideAppend, StateFromEventRepository, StreamState}; + + use crate::test_helpers::{ + deciders::user::{ + Guitar, User, UserCommand, UserDecider, UserDeciderCtx, UserDeciderState, UserEvent, + UserId, UserName, + }, + repository::test_versioned_event_repository_with_streams, + ValueType, + }; + + use super::*; + + const BASE_STREAM: u32 = const_random!(u32); + + async fn store_from_environment(base_stream: &str, ids: Vec) -> eventstore::Client { + let _ = dotenv::dotenv().expect("File .env or Env Vars not found"); + let settings = dotenv::var("ESDB_CONNECTION_STRING") + .expect("ESDB to be set in env") + .parse() + .expect("ESDB connection string to parse"); + + let client = eventstore::Client::new(settings).expect("Eventstore client"); + + for id in ids { + let _ = client + .delete_stream( + format!("{}-{}", base_stream, id), + &DeleteStreamOptions::default(), + ) + .await; + } + + client + } + + async fn add_guitar(base_stream: String, user_id: UserId, guitar: Guitar) { + let ctx = UserDeciderCtx::new(); + let client = store_from_environment(&base_stream.to_string(), vec![]).await; + let mut event_repository = + ESDBEventRepository::::new(&client, &base_stream.to_string()); + + println!("Adding Guitar {:?} for user {}", &guitar.brand, &user_id); + + let cmd = UserCommand::AddGuitar(user_id, guitar.to_owned()); + + let res = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::Existing(user_id.to_string()), + &ctx, + &cmd, + None, + ) + .await; + + println!( + "Result for Guitar {:?} for user {}: {:?}", + &guitar.brand, &user_id, res + ); + } + + #[actix_rt::test] + async fn repository_spec_test() { + let base_stream = BASE_STREAM; + let client = store_from_environment(&base_stream.to_string(), vec![1, 2]).await; + let event_repository = + ESDBEventRepository::::new(&client, &base_stream.to_string()); + + let _ = test_versioned_event_repository_with_streams(event_repository).await; + } + + #[actix_rt::test] + async fn test_occ() { + let base_stream = format!("{}_with_occ", BASE_STREAM); + let client = store_from_environment(&base_stream.to_string(), vec![1]).await; + let mut event_repository = + ESDBEventRepository::::new(&client, &base_stream.to_string()); + let ctx = UserDeciderCtx::new(); + + let cmd1 = UserCommand::AddUser("Mike".to_string()); + + let evts = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::New, + &ctx, + &cmd1, + None, + ) + .await + .expect("command_succeeds"); + + let first_id = evts.first().unwrap().get_id(); + + assert_matches!( + evts.first().expect("one event"), + UserEvent::UserAdded(User { id, name, .. }) if (&first_id == id) && (name.value() == "Mike".to_string()) + ); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &first_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([(first_id.clone(), User::new(first_id, UserName::try_from("Mike".to_string()).unwrap()))]) + ); + + let guitars = vec![ + Guitar { + brand: "Ibanez".to_string(), + }, + Guitar { + brand: "Gibson".to_string(), + }, + Guitar { + brand: "Fender".to_string(), + }, + Guitar { + brand: "Eastman".to_string(), + }, + Guitar { + brand: "Meyones".to_string(), + }, + Guitar { + brand: "PRS".to_string(), + }, + Guitar { + brand: "Yamaha".to_string(), + }, + Guitar { + brand: "Benedetto".to_string(), + }, + Guitar { + brand: "Strandberg".to_string(), + }, + ]; + + let futures = guitars + .iter() + .cloned() + .map(|g| add_guitar(base_stream.clone(), first_id.clone(), g).boxed()) + .collect::>>(); + + future::join_all(futures).await; + + thread::sleep(time::Duration::from_secs(1)); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &first_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_eq!( + state.users.get(&first_id).unwrap().guitars, + HashSet::from_iter(guitars.iter().cloned()) + ); + } +} diff --git a/src/repository/event.rs b/src/repository/event.rs new file mode 100644 index 0000000..d79e4aa --- /dev/null +++ b/src/repository/event.rs @@ -0,0 +1,103 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use thiserror::Error; + +use super::RepositoryVersion; +use crate::decider::Event; + +#[async_trait(?Send)] +pub trait EventRepository +where + E: Event, +{ + async fn load(&self) -> Result, Err>; + async fn append(&mut self, events: &Vec) -> Result, Err>; +} + +#[async_trait(?Send)] +pub trait VersionedEventRepository +where + E: Event, +{ + type Version: Eq; + + async fn load(&self) -> Result<(Vec, &Self::Version), Err>; + async fn load_from_version(&self) -> Result<(Vec, Option<&Self::Version>), Err>; + async fn append( + &mut self, + version: &Self::Version, + events: &Vec, + ) -> Result<(Vec, Self::Version), Err>; +} + +// Lifetimes added here to fix codegen issue with macro generated lifetimes - adding 'a and 'async_trait prevents +// compile errors about E not living long enough for fn append +// https://stackoverflow.com/questions/69560112/how-to-use-rust-async-trait-generic-to-a-lifetime-parameter +// https://github.com/dtolnay/async-trait/issues/8 +// https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=e977da3ddc0c21639b3116e123a94b6f +#[async_trait(?Send)] +pub trait VersionedEventRepositoryWithStreams<'a, E, Err> +where + E: Event + Debug, + Err: Debug + Send, +{ + type StreamId; + + async fn load( + &self, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError>; + + async fn load_from_version( + &self, + version: &RepositoryVersion, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError>; + + async fn append( + &mut self, + version: &RepositoryVersion, + stream: &Self::StreamId, + events: &Vec, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> + where + 'a: 'async_trait, + E: 'async_trait; +} + +#[derive(Debug, Error)] +pub enum VersionedRepositoryError { + #[error("Version conflict {0:?}")] + VersionConflict(VersionDiff), + #[error("Repository Error {0}")] + RepoErr(RepoErr), +} + +pub trait StreamIdFromEvent: Sized { + fn from(e: Evt) -> Self { + Self::event_entity_id_into(e.get_id()) + } + + fn event_entity_id_into(id: ::EntityId) -> Self; +} + +#[derive(Debug)] +pub struct VersionDiff { + expected: RepositoryVersion, + actual: RepositoryVersion, +} + +impl VersionDiff { + pub fn new(expected: RepositoryVersion, actual: RepositoryVersion) -> Self { + Self { expected, actual } + } + + pub fn expected(&self) -> RepositoryVersion { + self.expected.to_owned() + } + + pub fn actual(&self) -> RepositoryVersion { + self.actual.to_owned() + } +} diff --git a/src/repository/in_memory/mod.rs b/src/repository/in_memory/mod.rs new file mode 100644 index 0000000..39844e7 --- /dev/null +++ b/src/repository/in_memory/mod.rs @@ -0,0 +1,11 @@ +use std::fmt::Debug; + +pub mod simple; +pub mod state; +pub mod versioned_with_streams; + +#[derive(Debug, Default)] +pub(crate) struct InMemoryEventRepositoryState { + events: Vec, + position: usize, +} diff --git a/src/repository/in_memory/simple.rs b/src/repository/in_memory/simple.rs new file mode 100644 index 0000000..db47ce9 --- /dev/null +++ b/src/repository/in_memory/simple.rs @@ -0,0 +1,99 @@ +use std::{ + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use async_trait::async_trait; + +use crate::{ + decider::Event, + repository::{event::EventRepository, state::StateRepository}, +}; + +use super::InMemoryEventRepositoryState; + +#[derive(Default)] +pub struct InMemoryEventRepository +where + E: Event + Clone, +{ + state: Arc>>, +} + +impl InMemoryEventRepository +where + E: Event + Clone, +{ + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(InMemoryEventRepositoryState::new())), + } + } +} + +#[async_trait(?Send)] +impl EventRepository for InMemoryEventRepository +where + E: Event + Clone, +{ + async fn load(&self) -> Result, ()> { + let lock = self.state.lock().unwrap(); + Ok(lock.events.clone()) + } + + async fn append(&mut self, events: &Vec) -> Result, ()> { + let mut lock = self.state.lock().unwrap(); + lock.events.extend(events.to_owned()); + lock.position = lock.events.len(); + + Ok(events.clone()) + } +} + +impl InMemoryEventRepositoryState { + pub fn new() -> Self { + InMemoryEventRepositoryState { + events: vec![], + position: 0, + } + } +} + +pub struct InMemoryStateRepository { + state: State, +} + +impl InMemoryStateRepository +where + State: Default + Debug + Clone, +{ + pub fn new() -> Self { + Self::default() + } +} + +impl Default for InMemoryStateRepository +where + State: Default, +{ + fn default() -> Self { + Self { + state: State::default(), + } + } +} + +#[async_trait(?Send)] +impl StateRepository for InMemoryStateRepository +where + State: Default + Debug + Clone, +{ + async fn reify(&self) -> Result { + Ok(self.state.clone()) + } + + async fn save(&mut self, state: &State) -> Result { + self.state = state.clone(); + Ok(self.state.to_owned()) + } +} diff --git a/src/repository/in_memory/state/mod.rs b/src/repository/in_memory/state/mod.rs new file mode 100644 index 0000000..abb8335 --- /dev/null +++ b/src/repository/in_memory/state/mod.rs @@ -0,0 +1 @@ +pub mod versioned; diff --git a/src/repository/in_memory/state/versioned.rs b/src/repository/in_memory/state/versioned.rs new file mode 100644 index 0000000..c573308 --- /dev/null +++ b/src/repository/in_memory/state/versioned.rs @@ -0,0 +1,141 @@ +use std::{ + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use async_trait::async_trait; + +use crate::repository::{ + event::{VersionDiff, VersionedRepositoryError}, + state::VersionedStateRepository, + RepositoryVersion, +}; + +#[derive(Debug, Clone)] +pub struct InMemoryStateRepository +where + State: Debug, +{ + state: Arc>>, +} + +impl InMemoryStateRepository +where + State: Debug, +{ + pub fn new(state: State) -> Self { + Self { + state: Arc::new(Mutex::new(VersionedState::new(state))), + } + } + + fn version_to_usize( + version: &RepositoryVersion, + ) -> Result> { + match version { + RepositoryVersion::Exact(exact) => Ok(exact.to_owned()), + RepositoryVersion::NoStream => Ok(0), + RepositoryVersion::StreamExists => Ok(0), + RepositoryVersion::Any => Err(VersionedRepositoryError::RepoErr( + Error::ExactStreamVersionMustBeKnown, + )), + } + } + + fn version_check( + current: &RepositoryVersion, + incoming: &RepositoryVersion, + ) -> Result<(), VersionedRepositoryError> { + if Self::version_to_usize(current)? == Self::version_to_usize(incoming)? { + Ok(()) + } else { + Err(VersionedRepositoryError::VersionConflict(VersionDiff::new( + current.to_owned(), + incoming.to_owned(), + ))) + } + } + + fn bump_version( + version: &RepositoryVersion, + ) -> Result> { + Ok(RepositoryVersion::Exact( + Self::version_to_usize(version)? + 1, + )) + } +} + +#[async_trait] +impl<'a, State> VersionedStateRepository<'a, State, Error> for InMemoryStateRepository +where + State: Debug + Clone + Send + Sync, +{ + type Version = RepositoryVersion; + + async fn reify(&self) -> Result<(State, Self::Version), Error> { + let handle = self.state.lock().unwrap(); + + Ok((handle.data.to_owned(), handle.version)) + } + + async fn save( + &mut self, + version: &Self::Version, + state: &State, + ) -> Result> { + let handle_lock = self.state.lock(); + let mut handle = handle_lock.unwrap(); + + let _ = Self::version_check(&handle.version, version)?; + + handle.data = state.clone(); + handle.version = Self::bump_version(&handle.version)?; + + drop(handle); + + Ok(state.to_owned()) + } +} + +#[derive(Debug, Clone)] +struct VersionedState +where + State: Debug, +{ + data: State, + version: RepositoryVersion, +} + +impl VersionedState +where + State: Debug, +{ + fn new(data: State) -> Self { + Self { + data, + version: RepositoryVersion::StreamExists, + } + } +} + +#[derive(Debug, Clone)] +pub enum Error { + ExactStreamVersionMustBeKnown, +} + +#[cfg(test)] +mod tests { + use crate::test_helpers::{ + deciders::user::UserDeciderState, repository::test_versioned_state_repository, + }; + + use super::*; + + #[actix_rt::test] + async fn repository_spec_test() { + let state_repository: InMemoryStateRepository = + InMemoryStateRepository::new(UserDeciderState::default()); + + test_versioned_state_repository(state_repository).await; + } +} diff --git a/src/repository/in_memory/versioned_with_streams/error.rs b/src/repository/in_memory/versioned_with_streams/error.rs new file mode 100644 index 0000000..e623ee5 --- /dev/null +++ b/src/repository/in_memory/versioned_with_streams/error.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +use crate::repository::event::VersionDiff; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Cannot append, event version is out of date")] + VersionConflict(VersionDiff), +} diff --git a/src/repository/in_memory/versioned_with_streams/mod.rs b/src/repository/in_memory/versioned_with_streams/mod.rs new file mode 100644 index 0000000..bf37437 --- /dev/null +++ b/src/repository/in_memory/versioned_with_streams/mod.rs @@ -0,0 +1,174 @@ +use async_trait::async_trait; + +use std::{ + collections::HashMap, + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use crate::{ + decider::Event, + repository::{ + event::{VersionDiff, VersionedEventRepositoryWithStreams, VersionedRepositoryError}, + RepositoryVersion, + }, +}; + +use super::InMemoryEventRepositoryState; +use error::Error; + +pub mod error; + +#[derive(Clone)] +pub struct InMemoryEventRepository +where + E: Event + Debug, +{ + stream_name: String, + state: HashMap>>>, +} + +impl InMemoryEventRepository +where + E: Event + Debug, +{ + pub fn new(stream_name: &str) -> Self { + Self { + stream_name: stream_name.to_owned(), + state: HashMap::default(), + } + } + + fn get_base_stream_key(&self) -> String { + self.stream_name.to_owned() + } + + fn get_stream_key(&self, stream_id: Option<&String>) -> String { + if let Some(id) = stream_id { + format!("{}/{}", self.stream_name, id) + } else { + self.stream_name.to_string() + } + } + + fn get_stream_or_new(&mut self, key: &str) -> &Arc>> { + if self.state.get(key).is_none() { + self.state.insert( + key.to_owned(), + Arc::new(Mutex::new(InMemoryEventRepositoryState::new())), + ); + } + + self.state.get(key).unwrap() + } + + fn index_from_version(version: &RepositoryVersion) -> usize { + match version { + RepositoryVersion::Exact(v) => *v, + _ => 0, + } + } + + fn version_from_index(index: &usize) -> RepositoryVersion { + RepositoryVersion::Exact(*index) + } +} + +#[async_trait(?Send)] +impl<'a, E> VersionedEventRepositoryWithStreams<'a, E, Error> for InMemoryEventRepository +where + E: Event + Clone + Debug, +{ + type StreamId = String; + + async fn load( + &self, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> { + self.load_from_version(&RepositoryVersion::Any, id).await + } + async fn load_from_version( + &self, + version: &RepositoryVersion, + id: Option<&Self::StreamId>, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> { + let stream_key = self.get_stream_key(id); + + if let Some(m) = self.state.get(&stream_key) { + let stream_state = m.lock().unwrap(); + + let start = Self::index_from_version(version); + let end = stream_state.position + 1; + + return Ok(( + stream_state.events[start..end].to_vec(), + RepositoryVersion::Exact(stream_state.position), + )); + } else { + return Ok((vec![], RepositoryVersion::Exact(0))); + } + } + async fn append( + &mut self, + version: &RepositoryVersion, + stream: &Self::StreamId, + events: &Vec, + ) -> Result<(Vec, RepositoryVersion), VersionedRepositoryError> + where + 'a: 'async_trait, + E: 'async_trait, + { + let stream_key = self.get_stream_key(Some(stream)); + + let mut stream = self.get_stream_or_new(&stream_key).lock().unwrap(); + + if stream.position == Self::index_from_version(version) { + stream.events.extend(events.clone()); + let position = stream.events.len() - 1; + stream.position = position; + + drop(stream); // Drop mutable reference so we can pull another and write to sub_stream + + let mut sub_stream = self + .get_stream_or_new(&self.get_base_stream_key()) + .lock() + .unwrap(); + sub_stream.events.extend(events.clone()); + let sub_position = sub_stream.events.len() - 1; + sub_stream.position = sub_position; + + Ok((events.to_owned(), RepositoryVersion::Exact(position))) + } else { + Err(Error::VersionConflict(VersionDiff::new( + *version, + Self::version_from_index(&stream.position), + )) + .into()) + } + } +} + +impl From for VersionedRepositoryError { + fn from(value: Error) -> Self { + let Error::VersionConflict(diff) = value; + + VersionedRepositoryError::VersionConflict(diff) + } +} + +#[cfg(test)] +mod tests { + use crate::test_helpers::{ + deciders::user::UserEvent, repository::test_versioned_event_repository_with_streams, + }; + + use super::InMemoryEventRepository; + + const BASE_STREAM: &str = "test"; + + #[actix_rt::test] + async fn repository_spec_test() { + let event_repository = InMemoryEventRepository::::new(BASE_STREAM); + let _ = test_versioned_event_repository_with_streams(event_repository).await; + } +} diff --git a/src/repository/mod.rs b/src/repository/mod.rs new file mode 100644 index 0000000..d3e763a --- /dev/null +++ b/src/repository/mod.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "esdb")] +pub mod esdb; +pub mod event; +#[cfg(feature = "in_memory")] +pub mod in_memory; +pub mod state; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RepositoryVersion { + Any, + Exact(usize), + NoStream, + StreamExists, +} diff --git a/src/repository/state.rs b/src/repository/state.rs new file mode 100644 index 0000000..d63c04e --- /dev/null +++ b/src/repository/state.rs @@ -0,0 +1,28 @@ +use async_trait::async_trait; + +use super::event::VersionedRepositoryError; + +#[async_trait(?Send)] +pub trait StateRepository { + async fn reify(&self) -> Result; + async fn save(&mut self, state: &State) -> Result; +} + +#[async_trait(?Send)] +pub trait VersionedStateRepository<'a, State, Err> +where + Err: Send +{ + type Version; + + async fn reify(&self) -> Result<(State, Self::Version), Err>; + async fn save( + &mut self, + version: &Self::Version, + state: &State, + ) -> Result> + where + 'a: 'async_trait, + State: 'async_trait, + Err: 'async_trait; +} diff --git a/src/strategies/mod.rs b/src/strategies/mod.rs new file mode 100644 index 0000000..2da96a2 --- /dev/null +++ b/src/strategies/mod.rs @@ -0,0 +1,449 @@ +use core::time; +use std::{fmt::Debug, thread}; + +use crate::{ + decider::{DeciderWithContext, Evolver}, + repository::{ + self, event::VersionedRepositoryError, state::VersionedStateRepository, RepositoryVersion, + }, +}; +use async_trait::async_trait; +use repository::event::{StreamIdFromEvent, VersionedEventRepositoryWithStreams}; + +#[async_trait(?Send)] +pub trait StateFromEventRepository +{ + type Ev: Evolver + Send + Sync; + + async fn load<'a, Err: Send>( + initial: ::State, + event_repository: &(impl VersionedEventRepositoryWithStreams< + 'a, + ::Evt, + Err, + >), + ) -> Result<::State, VersionedRepositoryError> + where + Err: Debug, + { + Ok(event_repository + .load(None) + .await? + .0 + .iter() + .fold(initial, Self::Ev::evolve)) + } + + async fn load_by_id<'a, Err, StreamId>( + initial: ::State, + event_repository: &(impl VersionedEventRepositoryWithStreams< + 'a, + ::Evt, + Err, + StreamId = StreamId, + > + Send + + Sync), + stream_id: &StreamId, + ) -> Result<::State, VersionedRepositoryError> + where + Err: Debug + Send + Sync, + StreamId: Send + Sync, + { + Ok(event_repository + .load(Some(stream_id)) + .await? + .0 + .iter() + .fold(initial, Self::Ev::evolve)) + } +} + +#[async_trait(?Send)] +pub trait LoadDecideAppend +where + ::Evt: Clone + Send, + ::Err: Send +{ + type Decide: DeciderWithContext; + + fn to_lda_error( + err: VersionedRepositoryError, + ) -> LoadDecideAppendError { + match err { + VersionedRepositoryError::VersionConflict(_) => LoadDecideAppendError::VersionError, + VersionedRepositoryError::RepoErr(e) => LoadDecideAppendError::RepositoryErr(e), + } + } + + async fn execute<'a, RepoErr: Send, StreamId>( + initial: ::State, + event_repository: &mut (impl VersionedEventRepositoryWithStreams< + 'a, + ::Evt, + RepoErr, + StreamId = StreamId, + >), + stream_id: &StreamState, + ctx: &<::Decide as DeciderWithContext>::Ctx, + cmd: &<::Decide as DeciderWithContext>::Cmd, + retrys: Option, + ) -> Result< + Vec<::Evt>, + LoadDecideAppendError<::Err, RepoErr>, + > + where + RepoErr: Debug, + StreamId: Clone + StreamIdFromEvent<<::Decide as Evolver>::Evt>, + { + let (mut decider_evts, mut version) = match stream_id { + StreamState::New => (vec![], RepositoryVersion::NoStream), + StreamState::Existing(sid) => event_repository + .load(Some(sid)) + .await + .map_err(Self::to_lda_error)?, + }; + + let mut state = initial; + + for r in 1..retrys.unwrap_or(20) { + state = decider_evts + .iter() + .fold(state, ::evolve); + + let new_evts = ::decide(ctx, &state, cmd) + .map_err(LoadDecideAppendError::DecideErr)?; + + let stream = match stream_id { + StreamState::New => match new_evts.first() { + None => { + return Ok(vec![]); + } + Some(evt) => StreamId::from(evt.clone()), + }, + StreamState::Existing(sid) => sid.clone(), + }; + + match event_repository.append(&version, &stream, &new_evts).await { + Ok((appended_evts, _)) => return Ok(appended_evts), + Err(VersionedRepositoryError::RepoErr(e)) => { + println!("Max Retries for {:?}!!", &cmd); + return Err(LoadDecideAppendError::RepositoryErr(e)); + } + Err(VersionedRepositoryError::VersionConflict(_)) => { + println!("RETRY #{} for {:?}!!", &r, &cmd); + thread::sleep(time::Duration::new(0, 100000000 * r)); + let (mut catchup_evts, new_version) = event_repository + .load_from_version(&version, Some(&stream)) + .await + .map_err(Self::to_lda_error)?; + + version = new_version; + decider_evts.append(&mut catchup_evts); + } + }; + } + + Err(LoadDecideAppendError::OccMaxRetries) + } +} + +#[async_trait(?Send)] +pub trait ReifyDecideSave +where + <::Decide as Evolver>::State: Clone, + <::Decide as DeciderWithContext>::Cmd: Debug, +{ + type Decide: DeciderWithContext; + + async fn execute_reify_decide<'a, RepoErr: Send>( + state_repository: &mut (impl VersionedStateRepository< + 'a, + ::State, + RepoErr, + >), + ctx: &<::Decide as DeciderWithContext>::Ctx, + cmd: &<::Decide as DeciderWithContext>::Cmd, + retrys: Option, + ) -> Result< + ::State, + ReifyDecideSaveError<::Err, RepoErr>, + > { + let (mut state, mut version) = state_repository + .reify() + .await + .map_err(ReifyDecideSaveError::RepositoryErr)?; + + for r in 1..retrys.unwrap_or(20) { + let local_state = state.clone(); + let evts = ::decide(ctx, &local_state, cmd) + .map_err(ReifyDecideSaveError::DecideErr)?; + + let new_state = evts + .iter() + .fold(local_state, ::evolve); + + match state_repository.save(&version, &new_state).await { + Ok(s) => return Ok(s), + Err(VersionedRepositoryError::RepoErr(e)) => { + return Err(ReifyDecideSaveError::RepositoryErr(e)) + } + Err(VersionedRepositoryError::VersionConflict(_)) => { + println!("Retry #{} for {:?} - Reload State", &r, &cmd); + (state, version) = state_repository + .reify() + .await + .map_err(ReifyDecideSaveError::RepositoryErr)?; + } + } + } + + Err(ReifyDecideSaveError::OccMaxRetries) + } +} + +#[derive(Debug)] +pub struct CommandResponse( + ::Cmd, + Vec<::Evt>, + ::State, +); + +#[async_trait(?Send)] +pub trait DecideEvolveWithCommandResponse +where + ::State: Clone, +{ + type Decide: DeciderWithContext; + + async fn response( + cmd: <::Decide as DeciderWithContext>::Cmd, + state: &<::Decide as Evolver>::State, + ctx: &<::Decide as DeciderWithContext>::Ctx, + ) -> Result< + CommandResponse<::Decide>, + <::Decide as DeciderWithContext>::Err, + > { + let evts = ::decide(ctx, state, &cmd)?; + let state = evts + .iter() + .fold(state.to_owned(), ::evolve); + + Ok(CommandResponse(cmd, evts, state)) + } +} + +pub enum StreamState { + New, + Existing(T), +} + +#[derive(Debug)] +pub enum LoadDecideAppendError { + OccMaxRetries, + VersionError, + DecideErr(DecideErr), + RepositoryErr(RepoErr), +} + +#[derive(Debug)] +pub enum ReifyDecideSaveError { + OccMaxRetries, + DecideErr(DecideErr), + RepositoryErr(RepoErr), +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use assert_matches::assert_matches; + + use crate::{ + decider::Event, + repository::in_memory::{ + state::versioned::InMemoryStateRepository, + versioned_with_streams::InMemoryEventRepository, + }, + test_helpers::{ + deciders::user::{ + User, UserCommand, UserDecider, UserDeciderCtx, UserDeciderError, UserDeciderState, + UserEvent, UserFieldError, UserName, + }, + ValueType, + }, + }; + + use super::*; + + #[actix_rt::test] + async fn load_decide_append_basic_function() { + let ctx = UserDeciderCtx::new(); + + let mut event_repository = InMemoryEventRepository::::new("test"); + + let cmd1 = UserCommand::AddUser("Mike".to_string()); + + let evts = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::New, + &ctx, + &cmd1, + None, + ) + .await + .expect("command_succeeds"); + + let first_id = evts.first().unwrap().get_id(); + + assert_matches!( + evts.first().expect("one event"), + UserEvent::UserAdded(User { id, name, .. }) if (&first_id == id) && (name.value() == "Mike".to_string()) + ); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &first_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([(first_id.clone(), User::new(first_id, UserName::try_from("Mike".to_string()).unwrap()))]) + ); + + let cmd2 = UserCommand::AddUser("Dmitiry".to_string()); + let evts = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::New, + &ctx, + &cmd2, + None, + ) + .await + .expect("command_succeeds"); + + let second_id = evts.first().unwrap().get_id(); + + assert_matches!( + evts.first().expect("one event"), + UserEvent::UserAdded(User { id, name, .. }) if (&second_id == id) && (name.value() == "Dmitiry".to_string()) + ); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &second_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([(second_id.clone(), User::new(second_id, UserName::try_from("Dmitiry".to_string()).unwrap()))]) + ); + + let cmd3 = UserCommand::UpdateUserName(second_id.clone(), "Dmitiry2".to_string()); + let evts = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::Existing(second_id.to_string()), + &ctx, + &cmd3, + None, + ) + .await + .expect("command_succeeds"); + + assert_matches!( + evts.first().expect("one event"), + UserEvent::UserNameUpdated(id, name) if (id == &second_id) && (name == &UserName::try_from("Dmitiry2".to_string()).unwrap()) + ); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &second_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([(second_id.clone(), User::new(second_id, UserName::try_from("Dmitiry2".to_string()).unwrap()))]) + ); + + let cmd4 = + UserCommand::UpdateUserName(second_id.clone(), "DmitiryWayToLongToSucceed".to_string()); + + let res = UserDecider::execute( + UserDeciderState::default(), + &mut event_repository, + &StreamState::Existing(second_id.to_string()), + &ctx, + &cmd4, + None, + ) + .await; + + assert_matches!( + res, + Err(LoadDecideAppendError::DecideErr(UserDeciderError::UserField(UserFieldError::NameToLong(n)))) if n == "DmitiryWayToLongToSucceed".to_string() + ); + + let state = UserDeciderState::load_by_id( + UserDeciderState::default(), + &event_repository, + &second_id.to_string(), + ) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([(second_id.clone(), User::new(second_id, UserName::try_from("Dmitiry2".to_string()).unwrap()))]) + ); + + let state = UserDeciderState::load(UserDeciderState::default(), &event_repository) + .await + .expect("state is loaded"); + + assert_matches!( + state, + UserDeciderState { users } if users == HashMap::from([ + (first_id.clone(), User::new(first_id, UserName::try_from("Mike".to_string()).unwrap())), + (second_id.clone(), User::new(second_id, UserName::try_from("Dmitiry2".to_string()).unwrap())) + ]) + ); + } + + #[actix_rt::test] + async fn decide_evolve_with_command_response() { + let ctx = UserDeciderCtx::new(); + let state = UserDeciderState::default(); + + let cmd1 = UserCommand::AddUser("Mike".to_string()); + let res = UserDecider::response(cmd1, &state, &ctx).await; + + assert_matches!(res, Ok(CommandResponse(UserCommand::AddUser(_), _, _))); + } + + #[actix_rt::test] + async fn reify_decide_save_basic_functionality() { + let ctx = UserDeciderCtx::new(); + + let mut state_repository = + InMemoryStateRepository::::new(UserDeciderState::default()); + + let cmd1 = UserCommand::AddUser("Mike".to_string()); + + let res = UserDecider::execute_reify_decide(&mut state_repository, &ctx, &cmd1, None) + .await + .unwrap(); + + assert_eq!(res.users.len(), 1); + } +} diff --git a/src/test_helpers/deciders.rs b/src/test_helpers/deciders.rs new file mode 100644 index 0000000..2f8ebae --- /dev/null +++ b/src/test_helpers/deciders.rs @@ -0,0 +1,345 @@ +pub(crate) mod user { + use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, + }; + + use autoincrement::{AsyncIncrement, AsyncIncremental}; + use serde::{Deserialize, Serialize}; + use thiserror::Error; + + use crate::{ + decider::{Decider, DeciderWithContext, Event, Evolver}, + repository::event::StreamIdFromEvent, + strategies::{ + DecideEvolveWithCommandResponse, LoadDecideAppend, ReifyDecideSave, + StateFromEventRepository, + }, + test_helpers::ValueType, + }; + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) struct User { + pub id: UserId, + pub name: UserName, + pub guitars: HashSet, + } + + impl User { + pub fn new(id: UserId, name: UserName) -> Self { + Self { + id, + name, + guitars: HashSet::new(), + } + } + } + + pub(crate) type UserId = usize; + + pub(crate) type UnvalidatedUserName = String; + + #[derive(Debug, Clone, Hash, Serialize, Deserialize, PartialEq, Eq)] + pub(crate) struct Guitar { + pub brand: String, + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub(crate) struct UserName(String); + + impl TryFrom for UserName { + type Error = UserFieldError; + + fn try_from(value: String) -> Result { + let len = value.len(); + if len < 1 { + Err(UserFieldError::EmptyName) + } else if len > 10 { + Err(UserFieldError::NameToLong(value.to_owned())) + } else { + Ok(Self(value.to_owned())) + } + } + } + + impl TryFrom<&str> for UserName { + type Error = UserFieldError; + + fn try_from(value: &str) -> Result { + let len = value.len(); + if len < 1 { + Err(UserFieldError::EmptyName) + } else if len > 10 { + Err(UserFieldError::NameToLong(value.to_owned())) + } else { + Ok(Self(value.to_owned())) + } + } + } + + impl TryFrom<&String> for UserName { + type Error = UserFieldError; + + fn try_from(value: &String) -> Result { + let len = value.len(); + if len < 1 { + Err(UserFieldError::EmptyName) + } else if len > 10 { + Err(UserFieldError::NameToLong(value.to_owned())) + } else { + Ok(Self(value.to_owned())) + } + } + } + + impl ValueType for UserName { + fn value(&self) -> String { + self.0.to_owned() + } + } + + #[derive(Debug, Error)] + pub(crate) enum UserFieldError { + #[error("Username cannot be empty")] + EmptyName, + #[error("Username {0} is to long")] + NameToLong(String), + } + + #[derive(Debug)] + pub(crate) struct UserDecider; + + impl Decider for UserDecider { + type Cmd = UserCommand; + + type Err = UserDeciderError; + + fn decide( + state: &UserDeciderState, + cmd: &UserCommand, + ) -> Result, UserDeciderError> { + match cmd { + UserCommand::AddUser(user_name) => { + let name = UserName::try_from(user_name) + .map_err(|e| UserDeciderError::UserField(e))?; + + Ok(vec![UserEvent::UserAdded(User { + id: 1, + name, + guitars: HashSet::new(), + })]) + } + UserCommand::UpdateUserName(user_id, user_name) => { + let name = UserName::try_from(user_name) + .map_err(|e| UserDeciderError::UserField(e))?; + + Ok(vec![UserEvent::UserNameUpdated(user_id.to_owned(), name)]) + } + UserCommand::AddGuitar(user_id, guitar) => { + println!("ADD GUITAR STATE: {:?}", &state); + let user = state + .users + .get(&user_id) + .ok_or(UserDeciderError::NotFound(*user_id))?; + if user.guitars.contains(&guitar) { + Err(UserDeciderError::AlreadyHasGuitar(guitar.to_owned())) + } else { + Ok(vec![UserEvent::UserGuitarAdded( + *user_id, + guitar.to_owned(), + )]) + } + } + } + } + } + + impl Evolver for UserDecider { + type State = UserDeciderState; + + type Evt = UserEvent; + + fn evolve(mut state: UserDeciderState, event: &UserEvent) -> UserDeciderState { + match event { + UserEvent::UserAdded(user) => { + state.users.insert(user.id.to_owned(), user.to_owned()); + state + } + UserEvent::UserNameUpdated(user_id, user_name) => { + state.users.get_mut(&user_id).unwrap().name = user_name.to_owned(); + state + } + UserEvent::UserGuitarAdded(user_id, guitar) => { + state + .users + .get_mut(&user_id) + .unwrap() + .guitars + .insert(guitar.to_owned()); + state + } + } + } + } + + impl DeciderWithContext for UserDecider { + type Ctx = UserDeciderCtx; + + type Cmd = UserCommand; + + type Err = UserDeciderError; + + fn decide( + ctx: &UserDeciderCtx, + state: &UserDeciderState, + cmd: &UserCommand, + ) -> Result, UserDeciderError> { + match cmd { + UserCommand::AddUser(user_name) => { + let seq = ctx.id_sequence.lock().unwrap(); + let IdGen(id) = seq.pull(); + + let name = UserName::try_from(user_name) + .map_err(|e| UserDeciderError::UserField(e))?; + + Ok(vec![UserEvent::UserAdded(User { + id, + name, + guitars: HashSet::new(), + })]) + } + UserCommand::UpdateUserName(user_id, user_name) => { + let name = UserName::try_from(user_name) + .map_err(|e| UserDeciderError::UserField(e))?; + + Ok(vec![UserEvent::UserNameUpdated(user_id.to_owned(), name)]) + } + UserCommand::AddGuitar(user_id, guitar) => { + let user = state + .users + .get(&user_id) + .ok_or(UserDeciderError::NotFound(*user_id))?; + if user.guitars.contains(&guitar) { + Err(UserDeciderError::AlreadyHasGuitar(guitar.to_owned())) + } else { + Ok(vec![UserEvent::UserGuitarAdded( + *user_id, + guitar.to_owned(), + )]) + } + } + } + } + } + + impl LoadDecideAppend for UserDecider { + type Decide = Self; + } + + impl DecideEvolveWithCommandResponse for UserDecider { + type Decide = Self; + } + + impl ReifyDecideSave for UserDecider { + type Decide = Self; + } + + #[derive(Debug, Clone, PartialEq, Eq)] + pub(crate) struct UserDeciderState { + pub(crate) users: HashMap, + } + + impl UserDeciderState { + pub fn new(users: HashMap) -> Self { + Self::default().set_users(users) + } + + pub fn set_users(&self, users: HashMap) -> Self { + Self { users, ..*self } + } + } + + impl Default for UserDeciderState { + fn default() -> Self { + Self { + users: HashMap::new().to_owned(), + } + } + } + + #[derive(Clone, Debug)] + pub(crate) struct UserDeciderCtx { + id_sequence: Arc>>, + } + + impl UserDeciderCtx { + pub fn new() -> Self { + Self { + id_sequence: Arc::new(Mutex::new(IdGen::init())), + } + } + + pub fn current(&self) -> usize { + let IdGen(id) = self.id_sequence.lock().unwrap().current(); + id + } + } + + impl StateFromEventRepository for UserDeciderState { + type Ev = UserDecider; + } + + #[derive(Debug)] + pub(crate) enum UserCommand { + AddUser(UnvalidatedUserName), + UpdateUserName(UserId, UnvalidatedUserName), + AddGuitar(UserId, Guitar), + } + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] + pub(crate) enum UserEvent { + UserAdded(User), + UserNameUpdated(UserId, UserName), + UserGuitarAdded(UserId, Guitar), + } + + impl Event for UserEvent { + type EntityId = UserId; + + fn event_type(&self) -> String { + match self { + UserEvent::UserAdded(_) => "UserAdded".to_string(), + UserEvent::UserNameUpdated(_, _) => "UserNameUpdated".to_string(), + UserEvent::UserGuitarAdded(_, _) => "UserGuitarAdded".to_string(), + } + } + + fn get_id(&self) -> Self::EntityId { + match self { + UserEvent::UserAdded(User { id, .. }) => id, + UserEvent::UserNameUpdated(id, _) => id, + UserEvent::UserGuitarAdded(id, _) => id, + } + .to_owned() + } + } + + impl StreamIdFromEvent for String { + fn event_entity_id_into(id: ::EntityId) -> Self { + id.to_string() + } + } + + #[derive(Debug, Error)] + pub(crate) enum UserDeciderError { + #[error("Invalid user field {0:?}")] + UserField(UserFieldError), + #[error("User id {0} not found")] + NotFound(UserId), + #[error("Already has guitar {0:?}")] + AlreadyHasGuitar(Guitar), + } + + #[derive(AsyncIncremental, PartialEq, Eq, Clone, Debug)] + pub struct IdGen(usize); +} diff --git a/src/test_helpers/mod.rs b/src/test_helpers/mod.rs new file mode 100644 index 0000000..6a2af23 --- /dev/null +++ b/src/test_helpers/mod.rs @@ -0,0 +1,6 @@ +pub(crate) mod deciders; +pub(crate) mod repository; + +pub(crate) trait ValueType { + fn value(&self) -> T; +} diff --git a/src/test_helpers/repository.rs b/src/test_helpers/repository.rs new file mode 100644 index 0000000..72987ab --- /dev/null +++ b/src/test_helpers/repository.rs @@ -0,0 +1,129 @@ +use core::time; +use std::{collections::HashMap, fmt::Debug, thread}; + +use assert_matches::assert_matches; + +use crate::{ + repository::{ + event::VersionedEventRepositoryWithStreams, state::VersionedStateRepository, + RepositoryVersion, + }, + test_helpers::deciders::user::{User, UserId, UserName}, +}; + +use super::deciders::user::{UserDeciderState, UserEvent}; + +pub(crate) async fn test_versioned_event_repository_with_streams<'a, Err: Debug + Send + Sync>( + mut event_repository: impl VersionedEventRepositoryWithStreams< + 'a, + UserEvent, + Err, + StreamId = String, + >, +) { + println!("RUNNING UNIVERSAL SPEC TEST FOR VersionedEventRepositoryWithStreams"); + let id_1 = "1".to_string(); + let id_2 = "2".to_string(); + + let res: (Vec, RepositoryVersion) = + event_repository.load(None).await.expect("loaded"); + assert_matches!(res, (v, _) if v == vec![] as Vec); + + let events1 = vec![ + UserEvent::UserAdded(User::new( + 1, + UserName::try_from("Mike").expect("Name is valid"), + )), + UserEvent::UserNameUpdated( + 1 as UserId, + UserName::try_from("Mike2").expect("Name is valid"), + ), + ]; + + let _ = event_repository + .append(&RepositoryVersion::Any, &id_1, &events1) + .await + .expect("Successful append"); + + let events2 = vec![ + UserEvent::UserAdded(User::new( + 2, + UserName::try_from("Stella").expect("Name is valid"), + )), + UserEvent::UserNameUpdated( + 1 as UserId, + UserName::try_from("Stella2").expect("Name is valid"), + ), + ]; + + let _ = event_repository + .append(&RepositoryVersion::Any, &id_2, &events2) + .await + .expect("Successful append"); + + // Crude but we need to wait for ESDB to catch up its "Categories" auto projection + thread::sleep(time::Duration::from_secs(1)); + + let res = event_repository.load(Some(&id_1)).await; + assert_matches!(res, Ok((v, RepositoryVersion::Exact(_))) if v == events1); + + let res = event_repository.load(Some(&id_2)).await; + assert_matches!(res, Ok((v, RepositoryVersion::Exact(_))) if v == events2); + + let res = event_repository.load(None).await; + + let events_combined: Vec = events1.into_iter().chain(events2.into_iter()).collect(); + assert_matches!(res, Ok((v, RepositoryVersion::Exact(_))) if v == events_combined); + + let res = event_repository.load(Some(&id_1)).await; + let version = res.unwrap().1; + + let new_events = vec![UserEvent::UserNameUpdated( + 1, + UserName::try_from("Mike").expect("Name is valid"), + )]; + + let res = event_repository + .append(&version, &id_1, &new_events) + .await + .expect("Success"); + + let version = res.1; + + let (latest_events, _) = event_repository + .load_from_version(&version, Some(&id_1)) + .await + .expect("load success"); + + assert_eq!(latest_events.first().unwrap(), new_events.first().unwrap()); +} + +pub(crate) async fn test_versioned_state_repository<'a, Err: Debug + Send + Sync>( + mut state_repository: impl VersionedStateRepository< + 'a, + UserDeciderState, + Err, + Version = RepositoryVersion, + >, +) { + let new_state = UserDeciderState::new(HashMap::from([( + 1, + User::new(1, UserName::try_from("Mike").expect("valid")), + )])); + + let version = RepositoryVersion::Exact(0); + println!("Saving: state={:?}, version={:?}", &new_state, &version); + let _ = state_repository + .save(&version, &new_state) + .await + .expect("Success"); + println!("State saved"); + + assert_eq!( + state_repository.reify().await.expect("Success"), + (new_state.to_owned(), RepositoryVersion::Exact(1)) + ); + + let res = state_repository.save(&version, &new_state).await; + assert_matches!(res, Err(_)); +}