Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ Each entry lists the date and the crate versions that were released.
- Routed every CLI bench/dev_bench connect through the new helper to close the same bug class for `mqdb bench db` (sync + async + cascade + unique + changefeed), `mqdb bench pubsub`, `mqdb dev bench` (db/pubsub/sub-pub), and the broker-readiness probes in both `bench/common.rs::wait_for_broker_ready` and `dev_bench/helpers.rs::wait_for_broker_ready`. Removed two now-redundant local `connect_client` helpers in `db_cascade.rs` and `db_changefeed.rs`. The `pubsub.rs` paths use custom `ConnectOptions` (clean-start, custom keep-alive) so their connect calls are wrapped inline with the same timeout pattern rather than going through the helper.
- Regression test `test_cli_connect_timeout_against_silent_listener` in `crates/mqdb-cli/tests/cli_test.rs` spawns a TCP listener that accepts the connection without speaking MQTT and asserts `mqdb list ... --timeout 2` exits within 5 seconds with a "timed out" error. Verified to fail on main (pre-fix exits at ~6s with "Connection reset by peer") and pass with the fix in place.

## 2026-05-06 — mqdb-cluster 0.3.4

### Fixed

- **Partition snapshot import did not populate `FkReverseIndex`.** This was the "Known gap" called out in the 0.3.2 entry. After a rebalance-driven replica promotion, the new primary held the imported `db_data` records and FK constraints but its in-memory reverse-index cache (`(target_entity, target_id, source_entity, source_field) → {source_id, …}`) was empty for those records. `start_fk_reverse_lookup` and `handle_fk_reverse_lookup_request` would return empty for any record sitting on a newly-imported partition, causing ON DELETE CASCADE to miss children that the new primary owned and ON DELETE RESTRICT to silently allow deletes that should have been blocked.
- `StoreManager::import_partition` now calls a new private `rebuild_fk_indexes_after_import` step at the end of the import. It iterates every registered FK constraint and calls the existing `rebuild_fk_index_for_constraint` helper, which walks `db_data.list(source_entity)` (now populated with the just-imported records) and seeds the reverse index. Mirrors the existing pattern at `apply.rs:215` where constraint Insert via Raft replication triggers the same rebuild.
- Test coverage: 12 new tests (466 → 478 in the cluster lib). Direct `FkReverseIndex` unit tests in `data_store.rs` (insert/lookup/remove, idempotent inserts, removing unknown source ids, field-scoped keys), `update_fk_reverse_index` and `rebuild_fk_index_for_constraint` unit tests in `constraint_ops.rs` (Insert/Update/Delete paths, no-op when no constraints, malformed JSON, non-FK constraint), and a regression test `import_partition_rebuilds_fk_reverse_index` in `partition_io.rs` that confirmed by fail-on-disable / pass-on-restore that the rebuild call is what makes the assertion pass.
- E2E in `examples/cluster-rebalance-stores/run.sh` now creates 20 extra child comments (2 per parent) spread across all 10 parents and adds a cascade-via-node-4 observation: deletes every parent through node 4 after rebalance, then prints how many of the eligible children were cascade-removed. Surfaced as an observation rather than a hard assertion because cascade outcomes through any specific node depend on whether that node has the FK constraint locally, which is governed by schema/constraint replication topology (separate concern; see below).

### Discovered while running the new E2E (separate follow-up)

- **Constraints don't reach all nodes uniformly.** Across runs of the new E2E, only the leader (node 1) consistently held both the unique and FK constraints locally; nodes 2/3 sometimes had a subset, and a freshly-joined node 4 had none. Because constraints route through `schema_partition(entity)`, any node that doesn't own that partition reaches the constraint only via forwarding — not in its local `db_constraints` store. The FkReverseIndex rebuild this PR adds is correct in its scope (it rebuilds for whatever constraints the importing node has locally), but a fully-correct cascade through every node requires constraints to be cluster-wide broadcast state. Tracked as future work alongside the schema replication topology issue first noted in the 0.3.2 CHANGELOG entry.

## 2026-05-03 — mqdb-cluster 0.3.3

### Fixed
Expand All @@ -31,7 +44,7 @@ Each entry lists the date and the crate versions that were released.

### Known gaps not addressed here

- `FkReverseIndex` (cluster-wide cache of child-to-parent FK references) is **not** included in partition snapshots. It's not partition-scoped — should be either rebuilt during import via the FK constraint discovery path or treated as a separate broadcast entity. Tracked as future work.
- `FkReverseIndex` (cluster-wide cache of child-to-parent FK references) is **not** included in partition snapshots. It's not partition-scoped — should be either rebuilt during import via the FK constraint discovery path or treated as a separate broadcast entity. Tracked as future work. _Closed in 0.3.4 via the rebuild-during-import approach._

## 2026-04-25 — mqdb-cli 0.7.5

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/mqdb-cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqdb-cluster"
version = "0.3.3"
version = "0.3.4"
publish = false
edition.workspace = true
license = "AGPL-3.0-only"
Expand Down
79 changes: 79 additions & 0 deletions crates/mqdb-cluster/src/cluster/db/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,4 +802,83 @@ mod tests {
assert_eq!(results.len(), 1);
assert_eq!(results[0].id_str(), "alice");
}

#[test]
fn fk_reverse_index_insert_lookup_remove() {
let idx = FkReverseIndex::new();
idx.insert("posts", "p1", "comments", "post_id", "c1");
idx.insert("posts", "p1", "comments", "post_id", "c2");
idx.insert("posts", "p2", "comments", "post_id", "c3");

let mut p1_refs = idx.lookup("posts", "p1", "comments", "post_id");
p1_refs.sort();
assert_eq!(p1_refs, vec!["c1".to_string(), "c2".to_string()]);

assert_eq!(
idx.lookup("posts", "p2", "comments", "post_id"),
vec!["c3".to_string()],
);

assert!(
idx.lookup("posts", "missing", "comments", "post_id")
.is_empty(),
"lookup for unknown target must be empty",
);

idx.remove("posts", "p1", "comments", "post_id", "c1");
assert_eq!(
idx.lookup("posts", "p1", "comments", "post_id"),
vec!["c2".to_string()],
);

idx.remove("posts", "p1", "comments", "post_id", "c2");
assert!(
idx.lookup("posts", "p1", "comments", "post_id").is_empty(),
"removing the last source_id must leave the lookup empty",
);
}

#[test]
fn fk_reverse_index_insert_is_idempotent_per_source_id() {
let idx = FkReverseIndex::new();
idx.insert("posts", "p1", "comments", "post_id", "c1");
idx.insert("posts", "p1", "comments", "post_id", "c1");

assert_eq!(
idx.lookup("posts", "p1", "comments", "post_id"),
vec!["c1".to_string()],
"duplicate inserts of the same source_id must collapse",
);
}

#[test]
fn fk_reverse_index_remove_unknown_is_noop() {
let idx = FkReverseIndex::new();
idx.remove("posts", "p1", "comments", "post_id", "c1");
assert!(idx.lookup("posts", "p1", "comments", "post_id").is_empty());

idx.insert("posts", "p1", "comments", "post_id", "c1");
idx.remove("posts", "p1", "comments", "post_id", "c-other");
assert_eq!(
idx.lookup("posts", "p1", "comments", "post_id"),
vec!["c1".to_string()],
"removing a non-matching source_id must not affect existing entries",
);
}

#[test]
fn fk_reverse_index_keys_are_field_scoped() {
let idx = FkReverseIndex::new();
idx.insert("users", "u1", "posts", "author_id", "p1");
idx.insert("users", "u1", "posts", "editor_id", "p2");

assert_eq!(
idx.lookup("users", "u1", "posts", "author_id"),
vec!["p1".to_string()],
);
assert_eq!(
idx.lookup("users", "u1", "posts", "editor_id"),
vec!["p2".to_string()],
);
}
}
187 changes: 187 additions & 0 deletions crates/mqdb-cluster/src/cluster/store_manager/constraint_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,190 @@ impl StoreManager {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::NodeId;
use crate::cluster::db::{ClusterConstraint, OnDeleteAction};

fn node(id: u16) -> NodeId {
NodeId::validated(id).unwrap()
}

fn fk_comments_to_posts() -> ClusterConstraint {
ClusterConstraint::foreign_key(
"comments",
"fk_comment_post",
"post_id",
"posts",
"id",
OnDeleteAction::Cascade,
)
}

#[test]
fn update_fk_reverse_index_inserts_on_create() {
let store = StoreManager::new(node(1));
store.db_constraints.add(fk_comments_to_posts()).unwrap();

store.update_fk_reverse_index(
Operation::Insert,
"comments",
"c1",
Some(b"{\"post_id\":\"p1\"}"),
None,
);

assert_eq!(
store.fk_reverse_lookup("posts", "p1", "comments", "post_id"),
vec!["c1".to_string()],
);
}

#[test]
fn update_fk_reverse_index_is_noop_without_fk_constraint() {
let store = StoreManager::new(node(1));

store.update_fk_reverse_index(
Operation::Insert,
"comments",
"c1",
Some(b"{\"post_id\":\"p1\"}"),
None,
);

assert!(
store
.fk_reverse_lookup("posts", "p1", "comments", "post_id")
.is_empty(),
"no constraint registered → no reverse index entry",
);
}

#[test]
fn update_fk_reverse_index_handles_update_repoint() {
let store = StoreManager::new(node(1));
store.db_constraints.add(fk_comments_to_posts()).unwrap();

store.update_fk_reverse_index(
Operation::Insert,
"comments",
"c1",
Some(b"{\"post_id\":\"p1\"}"),
None,
);
store.update_fk_reverse_index(
Operation::Update,
"comments",
"c1",
Some(b"{\"post_id\":\"p2\"}"),
Some(b"{\"post_id\":\"p1\"}"),
);

assert!(
store
.fk_reverse_lookup("posts", "p1", "comments", "post_id")
.is_empty(),
"update must remove the old reference",
);
assert_eq!(
store.fk_reverse_lookup("posts", "p2", "comments", "post_id"),
vec!["c1".to_string()],
"update must add the new reference",
);
}

#[test]
fn update_fk_reverse_index_removes_on_delete() {
let store = StoreManager::new(node(1));
store.db_constraints.add(fk_comments_to_posts()).unwrap();

store.update_fk_reverse_index(
Operation::Insert,
"comments",
"c1",
Some(b"{\"post_id\":\"p1\"}"),
None,
);
store.update_fk_reverse_index(
Operation::Delete,
"comments",
"c1",
None,
Some(b"{\"post_id\":\"p1\"}"),
);

assert!(
store
.fk_reverse_lookup("posts", "p1", "comments", "post_id")
.is_empty(),
"delete must drop the reverse index entry",
);
}

#[test]
fn update_fk_reverse_index_skips_malformed_payloads() {
let store = StoreManager::new(node(1));
store.db_constraints.add(fk_comments_to_posts()).unwrap();

store.update_fk_reverse_index(Operation::Insert, "comments", "c1", Some(b"not json"), None);

assert!(
store
.fk_reverse_lookup("posts", "p1", "comments", "post_id")
.is_empty(),
"malformed JSON must be silently skipped, not panic",
);
}

#[test]
fn rebuild_fk_index_for_constraint_walks_existing_records() {
let store = StoreManager::new(node(1));
let fk = fk_comments_to_posts();

store
.db_data
.create("comments", "c1", b"{\"post_id\":\"p1\"}", 1_000)
.unwrap();
store
.db_data
.create("comments", "c2", b"{\"post_id\":\"p1\"}", 1_000)
.unwrap();
store
.db_data
.create("comments", "c3", b"{\"post_id\":\"p2\"}", 1_000)
.unwrap();

store.rebuild_fk_index_for_constraint(&fk);

let mut p1_refs = store.fk_reverse_lookup("posts", "p1", "comments", "post_id");
p1_refs.sort();
assert_eq!(p1_refs, vec!["c1".to_string(), "c2".to_string()]);

assert_eq!(
store.fk_reverse_lookup("posts", "p2", "comments", "post_id"),
vec!["c3".to_string()],
);
}

#[test]
fn rebuild_fk_index_for_constraint_is_noop_for_unique_constraint() {
let store = StoreManager::new(node(1));
let unique = ClusterConstraint::unique("comments", "uniq_text", "text");

store
.db_data
.create("comments", "c1", b"{\"post_id\":\"p1\"}", 1_000)
.unwrap();

store.rebuild_fk_index_for_constraint(&unique);

assert!(
store
.fk_reverse_lookup("posts", "p1", "comments", "post_id")
.is_empty(),
"non-FK constraint must not populate the reverse index",
);
}
}
Loading
Loading