Implementing the Event Sourcing and CQRS patterns with RustAPI for audit trails and complex domain logic.
📖 Related: Domain-Driven Design patterns
This example demonstrates:
- Event Sourcing — Store all changes as events, not just current state
- CQRS — Command Query Responsibility Segregation
- Aggregate Reconstruction — Rebuild state from event history
- Domain Events — Type-safe event definitions
- Concurrent Event Store — Thread-safe with DashMap
- Rust 1.70+
- Understanding of crud-api patterns
- Basic knowledge of Event Sourcing concepts
| Feature | Description |
|---|---|
| Event Sourcing | Immutable event log |
| CQRS | Separate read/write models |
| DashMap | Concurrent state storage |
| Domain Events | Typed event variants |
| Aggregate Pattern | State reconstruction |
# Run the example
cargo run -p event-sourcing
# Server starts at http://127.0.0.1:8080This example models a simple bank account with:
- Account opening
- Deposits
- Withdrawals
All operations are stored as events, enabling full audit trail and state reconstruction.
| Method | Path | Description |
|---|---|---|
| POST | /accounts/{id} |
Execute command (open/deposit/withdraw) |
| GET | /accounts/{id} |
Get current account state |
| GET | /accounts/{id}/events |
Get event history |
curl -X POST http://127.0.0.1:8080/accounts/acc-001 \
-H "Content-Type: application/json" \
-d '{
"type": "OpenAccount",
"owner": "Alice",
"initial_balance": 1000.0
}'curl -X POST http://127.0.0.1:8080/accounts/acc-001 \
-H "Content-Type: application/json" \
-d '{
"type": "Deposit",
"amount": 500.0
}'curl -X POST http://127.0.0.1:8080/accounts/acc-001 \
-H "Content-Type: application/json" \
-d '{
"type": "Withdraw",
"amount": 200.0
}'curl http://127.0.0.1:8080/accounts/acc-001
# Response:
# {
# "id": "acc-001",
# "owner": "Alice",
# "balance": 1300.0,
# "version": 3
# }curl http://127.0.0.1:8080/accounts/acc-001/events
# Response:
# [
# {"type": "AccountOpened", "owner": "Alice", "initial_balance": 1000.0},
# {"type": "MoneyDeposited", "amount": 500.0},
# {"type": "MoneyWithdrawn", "amount": 200.0}
# ]#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum BankEvent {
AccountOpened { owner: String, initial_balance: f64 },
MoneyDeposited { amount: f64 },
MoneyWithdrawn { amount: f64 },
}Events are immutable facts about what happened. The #[serde(tag = "type")] adds a discriminator for JSON.
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum BankCommand {
OpenAccount { owner: String, initial_balance: f64 },
Deposit { amount: f64 },
Withdraw { amount: f64 },
}Commands express intent — they may be rejected if business rules are violated.
#[derive(Debug, Clone, Default)]
struct BankAccount {
id: String,
owner: String,
balance: f64,
version: u64, // Event version for optimistic concurrency
}
impl BankAccount {
fn apply(&mut self, event: &BankEvent) {
match event {
BankEvent::AccountOpened { owner, initial_balance } => {
self.owner = owner.clone();
self.balance = *initial_balance;
}
BankEvent::MoneyDeposited { amount } => {
self.balance += amount;
}
BankEvent::MoneyWithdrawn { amount } => {
self.balance -= amount;
}
}
self.version += 1;
}
}#[derive(Clone)]
struct EventStore {
events: Arc<DashMap<String, Vec<BankEvent>>>,
}
impl EventStore {
async fn append(&self, aggregate_id: &str, event: BankEvent) {
self.events
.entry(aggregate_id.to_string())
.or_default()
.push(event);
}
async fn load(&self, aggregate_id: &str) -> Option<BankAccount> {
let events = self.events.get(aggregate_id)?;
let mut account = BankAccount {
id: aggregate_id.to_string(),
..Default::default()
};
// Reconstruct state by replaying events
for event in events.iter() {
account.apply(event);
}
Some(account)
}
}async fn handle_command(
State(state): State<AppState>,
Path(id): Path<String>,
Json(cmd): Json<BankCommand>,
) -> Result<Json<BankAccount>, ApiError> {
let mut account = state.event_store.load(&id).await
.unwrap_or_default();
// Validate command and produce event
let event = match cmd {
BankCommand::Withdraw { amount } => {
if account.balance < amount {
return Err(ApiError::bad_request("Insufficient funds"));
}
BankEvent::MoneyWithdrawn { amount }
}
// ... other commands
};
// Store event
state.event_store.append(&id, event.clone()).await;
// Apply to get new state
account.apply(&event);
Ok(Json(account))
}| Benefit | Description |
|---|---|
| Audit Trail | Complete history of all changes |
| Time Travel | Reconstruct state at any point |
| Debugging | See exactly what happened |
| Event Replay | Rebuild read models, fix bugs |
| Analytics | Rich historical data |
┌─────────────┐ ┌─────────────┐
│ Command │────▶│ Events │
│ Handler │ │ (Write) │
└─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ Query │◀────│ Projector │
│ Handler │ │ (Read) │
└─────────────┘ └─────────────┘
// Events are the source of truth
let events = vec![
BankEvent::AccountOpened { owner: "Alice", initial_balance: 100.0 },
BankEvent::MoneyDeposited { amount: 50.0 },
BankEvent::MoneyWithdrawn { amount: 30.0 },
];
// Current state = fold(events)
let account = events.iter().fold(BankAccount::default(), |mut acc, e| {
acc.apply(e);
acc
});
// balance = 120.0[dependencies]
rustapi-rs = { version = "0.2" }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
dashmap = "5.5"
uuid = { version = "1", features = ["v4"] }For production, replace DashMap with a proper event store:
- PostgreSQL with event table
- EventStoreDB
- Apache Kafka
For aggregates with many events, use snapshots:
struct Snapshot {
aggregate_id: String,
version: u64,
state: BankAccount,
}
// Load from snapshot + recent events only
fn load_with_snapshot(id: &str) -> BankAccount {
let snapshot = get_latest_snapshot(id);
let events = get_events_since(id, snapshot.version);
let mut account = snapshot.state;
for event in events {
account.apply(&event);
}
account
}Handle schema changes with upcasting:
enum BankEventV1 { ... }
enum BankEventV2 { ... } // New version
fn upcast(v1: BankEventV1) -> BankEventV2 {
// Transform old events to new format
}- microservices — Distributed architecture
- proof-of-concept — Full application
- LEARNING_PATH.md — Learning progression
- RustAPI Cookbook