diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f6c017..7ae90e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. Each entry lists the date and the crate versions that were released. +## 2026-05-03 — mqdb-cluster 0.3.3 + +### Fixed + +- Unified `import_*` error handling across all six DB stores. Previously `IndexStore::import_entries` propagated `SerializationError` on a malformed entry while the other five (`DbDataStore`, `SchemaStore`, `ConstraintStore`, `UniqueStore`, `FkValidationStore`) silently skipped — a corrupt snapshot stream would either truncate or abort depending on which store hit the bad bytes first. All six now propagate `SerializationError` on deserialize failure. +- Fixed `IndexStore::import_entries` counter accuracy. It previously incremented `imported` even when `add_entry` returned `AlreadyExists`, inflating the count on idempotent snapshot replay. Now only successful inserts are counted; `AlreadyExists` is treated as benign replay and silently swallowed. + ## 2026-04-25 — mqdb-cluster 0.3.2 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index f955039..19c57c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,7 +1411,7 @@ dependencies = [ [[package]] name = "mqdb-cluster" -version = "0.3.2" +version = "0.3.3" dependencies = [ "arc-swap", "bebytes", diff --git a/crates/mqdb-cluster/Cargo.toml b/crates/mqdb-cluster/Cargo.toml index 6264d0e..14b0635 100644 --- a/crates/mqdb-cluster/Cargo.toml +++ b/crates/mqdb-cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqdb-cluster" -version = "0.3.2" +version = "0.3.3" publish = false edition.workspace = true license = "AGPL-3.0-only" diff --git a/crates/mqdb-cluster/src/cluster/db/constraint_store.rs b/crates/mqdb-cluster/src/cluster/db/constraint_store.rs index 4da218c..73b427c 100644 --- a/crates/mqdb-cluster/src/cluster/db/constraint_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/constraint_store.rs @@ -458,7 +458,8 @@ impl ConstraintStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if a key or constraint cannot be deserialized. + /// Returns `SerializationError` if a key fails UTF-8 parsing or a + /// constraint fails to deserialize. pub fn import_constraints(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -499,13 +500,13 @@ impl ConstraintStore { let constraint_bytes = &data[offset..offset + data_len]; offset += data_len; - if let Some(constraint) = Self::deserialize(constraint_bytes) { - self.constraints - .write() - .unwrap() - .insert(key.to_string(), constraint); - imported += 1; - } + let constraint = Self::deserialize(constraint_bytes) + .ok_or(ConstraintStoreError::SerializationError)?; + self.constraints + .write() + .unwrap() + .insert(key.to_string(), constraint); + imported += 1; } Ok(imported) diff --git a/crates/mqdb-cluster/src/cluster/db/data_store.rs b/crates/mqdb-cluster/src/cluster/db/data_store.rs index 3ffe827..b163536 100644 --- a/crates/mqdb-cluster/src/cluster/db/data_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/data_store.rs @@ -412,7 +412,8 @@ impl DbDataStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if UTF-8 parsing fails. + /// Returns `SerializationError` if a key fails UTF-8 parsing or an entity + /// fails to deserialize. pub fn import_entities(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -453,10 +454,10 @@ impl DbDataStore { let entity_data = &data[offset..offset + data_len]; offset += data_len; - if let Some(entity) = Self::deserialize(entity_data) { - self.write_entities().insert(key.to_string(), entity); - imported += 1; - } + let entity = + Self::deserialize(entity_data).ok_or(DbDataStoreError::SerializationError)?; + self.write_entities().insert(key.to_string(), entity); + imported += 1; } Ok(imported) diff --git a/crates/mqdb-cluster/src/cluster/db/fk_store.rs b/crates/mqdb-cluster/src/cluster/db/fk_store.rs index 06c0ccf..3faea81 100644 --- a/crates/mqdb-cluster/src/cluster/db/fk_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/fk_store.rs @@ -298,7 +298,8 @@ impl FkValidationStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if a key or request cannot be deserialized. + /// Returns `SerializationError` if a key fails UTF-8 parsing or a request + /// fails to deserialize. pub fn import_requests(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -339,13 +340,13 @@ impl FkValidationStore { let req_bytes = &data[offset..offset + data_len]; offset += data_len; - if let Some(request) = Self::deserialize_request(req_bytes) { - self.pending_requests - .write() - .unwrap() - .insert(key.to_string(), request); - imported += 1; - } + let request = + Self::deserialize_request(req_bytes).ok_or(FkStoreError::SerializationError)?; + self.pending_requests + .write() + .unwrap() + .insert(key.to_string(), request); + imported += 1; } Ok(imported) diff --git a/crates/mqdb-cluster/src/cluster/db/index_store.rs b/crates/mqdb-cluster/src/cluster/db/index_store.rs index 39402fc..c9f1686 100644 --- a/crates/mqdb-cluster/src/cluster/db/index_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/index_store.rs @@ -307,7 +307,9 @@ impl IndexStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if an entry cannot be deserialized. + /// Returns `SerializationError` if an entry fails to deserialize. + /// Entries that already exist on the destination are silently ignored + /// (idempotent snapshot replay) and not counted. pub fn import_entries(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -337,8 +339,11 @@ impl IndexStore { let entry = Self::deserialize(entry_bytes).ok_or(IndexStoreError::SerializationError)?; - self.add_entry(entry).ok(); - imported += 1; + match self.add_entry(entry) { + Ok(()) => imported += 1, + Err(IndexStoreError::AlreadyExists) => {} + Err(e) => return Err(e), + } } Ok(imported) @@ -614,4 +619,42 @@ mod tests { } } } + + #[test] + fn import_entries_is_idempotent_on_replay() { + let src = IndexStore::new(node(1)); + let dst = IndexStore::new(node(2)); + + for i in 0_u16..4 { + src.add_entry(IndexEntry::create( + "users", + "email", + format!("u{i}@x.com").as_bytes(), + partition(i), + &format!("u{i}"), + )) + .unwrap(); + } + + let target = src + .lookup("users", "email", b"u0@x.com") + .first() + .map(IndexEntry::index_partition) + .unwrap(); + let target_count = (0_u16..4) + .filter(|i| { + src.lookup("users", "email", format!("u{i}@x.com").as_bytes()) + .first() + .map(IndexEntry::index_partition) + == Some(target) + }) + .count(); + + let payload = src.export_for_partition(target); + let first = dst.import_entries(&payload).unwrap(); + let second = dst.import_entries(&payload).unwrap(); + + assert_eq!(first, target_count); + assert_eq!(second, 0); + } } diff --git a/crates/mqdb-cluster/src/cluster/db/schema_store.rs b/crates/mqdb-cluster/src/cluster/db/schema_store.rs index 3d7f3fc..2d9ae47 100644 --- a/crates/mqdb-cluster/src/cluster/db/schema_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/schema_store.rs @@ -269,7 +269,8 @@ impl SchemaStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if a key or schema cannot be deserialized. + /// Returns `SerializationError` if a key fails UTF-8 parsing or a schema + /// fails to deserialize. pub fn import_schemas(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -310,13 +311,13 @@ impl SchemaStore { let schema_bytes = &data[offset..offset + data_len]; offset += data_len; - if let Some(schema) = Self::deserialize(schema_bytes) { - self.schemas - .write() - .unwrap() - .insert(key.to_string(), schema); - imported += 1; - } + let schema = + Self::deserialize(schema_bytes).ok_or(SchemaStoreError::SerializationError)?; + self.schemas + .write() + .unwrap() + .insert(key.to_string(), schema); + imported += 1; } Ok(imported) diff --git a/crates/mqdb-cluster/src/cluster/db/unique_store.rs b/crates/mqdb-cluster/src/cluster/db/unique_store.rs index 5691dc6..9a89382 100644 --- a/crates/mqdb-cluster/src/cluster/db/unique_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/unique_store.rs @@ -386,7 +386,8 @@ impl UniqueStore { /// Panics if the internal lock is poisoned. /// /// # Errors - /// Returns `SerializationError` if a key or reservation cannot be deserialized. + /// Returns `SerializationError` if a key fails UTF-8 parsing or a + /// reservation fails to deserialize. pub fn import_reservations(&self, data: &[u8]) -> Result { if data.len() < 4 { return Ok(0); @@ -427,13 +428,13 @@ impl UniqueStore { let res_bytes = &data[offset..offset + data_len]; offset += data_len; - if let Some(reservation) = Self::deserialize(res_bytes) { - self.reservations - .write() - .unwrap() - .insert(key.to_string(), reservation); - imported += 1; - } + let reservation = + Self::deserialize(res_bytes).ok_or(UniqueStoreError::SerializationError)?; + self.reservations + .write() + .unwrap() + .insert(key.to_string(), reservation); + imported += 1; } Ok(imported)