Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.

Each entry lists the date and the crate versions that were released.

## 2026-05-06 — mqdb-cli 0.7.6

### Fixed

- `--timeout` did not apply during the MQTT CONNECT handshake. `connect_client` in `crates/mqdb-cli/src/common.rs` only wrapped the request/response wait in `tokio::time::timeout`; the `MqttClient::connect[_with_options]` calls themselves had no timeout, so any command (`mqdb list`, `read`, `create`, `update`, `delete`, etc.) would hang indefinitely against a TCP listener that accepts the connection but never sends CONNACK (silent broker, half-open NAT, firewall drop after SYN-ACK).
- Extracted a shared `connect_with_timeout(client, client_id, conn)` helper in `common.rs` that wraps both `MqttClient::connect[_with_options]` calls in `tokio::time::timeout(Duration::from_secs(conn.timeout), …)` and surfaces `connect to {broker} timed out after {N}s` on expiry. The helper also honors `conn.insecure` for self-signed TLS — previously only the bench paths set this, the CRUD path silently skipped it.
- Routed every CLI bench/dev_bench connect through the new helper to close the same bug class for `mqdb bench db` (sync + async + cascade + unique + changefeed), `mqdb bench pubsub`, `mqdb dev bench` (db/pubsub/sub-pub), and the broker-readiness probes in both `bench/common.rs::wait_for_broker_ready` and `dev_bench/helpers.rs::wait_for_broker_ready`. Removed two now-redundant local `connect_client` helpers in `db_cascade.rs` and `db_changefeed.rs`. The `pubsub.rs` paths use custom `ConnectOptions` (clean-start, custom keep-alive) so their connect calls are wrapped inline with the same timeout pattern rather than going through the helper.
- Regression test `test_cli_connect_timeout_against_silent_listener` in `crates/mqdb-cli/tests/cli_test.rs` spawns a TCP listener that accepts the connection without speaking MQTT and asserts `mqdb list ... --timeout 2` exits within 5 seconds with a "timed out" error. Verified to fail on main (pre-fix exits at ~6s with "Connection reset by peer") and pass with the fix in place.

## 2026-05-03 — mqdb-cluster 0.3.3

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/mqdb-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqdb-cli"
version = "0.7.5"
version = "0.7.6"
publish = false
edition.workspace = true
license = "AGPL-3.0-only"
Expand Down
17 changes: 5 additions & 12 deletions crates/mqdb-cli/src/commands/bench/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
use std::time::Duration;

use mqtt5::client::MqttClient;
use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties};
use mqtt5::types::{PublishOptions, PublishProperties};
use serde_json::{Value, json};

use crate::cli_types::{ConnectionArgs, OutputFormat};
use crate::common::connect_with_timeout;

pub(crate) struct BenchDbArgs {
pub(crate) operations: u64,
Expand Down Expand Up @@ -129,17 +130,9 @@ pub(crate) async fn wait_for_broker_ready(

let client_id = format!("bench-ready-{}", uuid::Uuid::new_v4());
let client = MqttClient::new(&client_id);
if conn.insecure {
client.set_insecure_tls(true).await;
}
let connected = if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) {
let opts = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&conn.broker, opts))
.await
.is_ok()
} else {
client.connect(&conn.broker).await.is_ok()
};
let connected = connect_with_timeout(&client, &client_id, conn)
.await
.is_ok();

if connected {
let response_topic = format!("bench-ready/{}", uuid::Uuid::new_v4());
Expand Down
13 changes: 3 additions & 10 deletions crates/mqdb-cli/src/commands/bench/db_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::time::Duration;

use mqtt5::QoS;
use mqtt5::client::MqttClient;
use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties};
use mqtt5::types::{PublishOptions, PublishProperties};
use serde_json::{Value, json};

use crate::cli_types::OutputFormat;
use crate::common::connect_with_timeout;

use super::common::{BenchDbArgs, DbOp, generate_record};

Expand Down Expand Up @@ -54,15 +55,7 @@ pub(super) async fn cmd_bench_db_async(

let client_id = format!("bench-db-async-{}", uuid::Uuid::new_v4());
let client = MqttClient::new(client_id.clone());
if args.conn.insecure {
client.set_insecure_tls(true).await;
}
if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) {
let opts = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?;
} else {
client.connect(&args.conn.broker).await?;
}
connect_with_timeout(&client, &client_id, &args.conn).await?;

let seeded_ids: Arc<Vec<String>> = if matches!(op, DbOp::Get | DbOp::Update | DbOp::Delete) {
let seed_count = args.seed.max(1000);
Expand Down
26 changes: 5 additions & 21 deletions crates/mqdb-cli/src/commands/bench/db_cascade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use mqtt5::client::MqttClient;
use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties};
use mqtt5::types::{PublishOptions, PublishProperties};
use serde_json::{Value, json};

use super::common::{BenchDbArgs, wait_for_broker_ready};
use crate::cli_types::{ConnectionArgs, OutputFormat};
use crate::common::connect_with_timeout;

#[allow(
clippy::too_many_lines,
Expand All @@ -36,7 +37,7 @@ pub(crate) async fn cmd_bench_db_cascade(

let setup_client_id = format!("bench-cascade-setup-{}", uuid::Uuid::new_v4());
let setup_client = MqttClient::new(setup_client_id.clone());
connect_client(&setup_client, &setup_client_id, &args.conn).await?;
connect_with_timeout(&setup_client, &setup_client_id, &args.conn).await?;

let delete_latencies_ns: Vec<u64> = Vec::new();
let propagation_latencies_ns: Vec<u64> = Vec::new();
Expand Down Expand Up @@ -72,7 +73,7 @@ pub(crate) async fn cmd_bench_db_cascade(

let sub_client_id = format!("bench-cascade-sub-{run}-{}", uuid::Uuid::new_v4());
let sub_client = MqttClient::new(sub_client_id.clone());
connect_client(&sub_client, &sub_client_id, &args.conn).await?;
connect_with_timeout(&sub_client, &sub_client_id, &args.conn).await?;

let events_topic = format!("$DB/{child_entity}/events/#");
let pending_children: Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
Expand Down Expand Up @@ -213,31 +214,14 @@ pub(crate) async fn cmd_bench_db_cascade(
Ok(())
}

async fn connect_client(
client: &MqttClient,
client_id: &str,
conn: &ConnectionArgs,
) -> Result<(), Box<dyn std::error::Error>> {
if conn.insecure {
client.set_insecure_tls(true).await;
}
if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) {
let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&conn.broker, opts)).await?;
} else {
client.connect(&conn.broker).await?;
}
Ok(())
}

async fn register_fk_cascade(
conn: &ConnectionArgs,
parent_entity: &str,
child_entity: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let client_id = format!("bench-cascade-admin-{}", uuid::Uuid::new_v4());
let client = MqttClient::new(client_id.clone());
connect_client(&client, &client_id, conn).await?;
connect_with_timeout(&client, &client_id, conn).await?;

let response_topic = format!("bench-cascade-admin/resp/{}", uuid::Uuid::new_v4());
let (tx, rx) = flume::bounded::<Vec<u8>>(4);
Expand Down
23 changes: 3 additions & 20 deletions crates/mqdb-cli/src/commands/bench/db_changefeed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use mqtt5::client::MqttClient;
use mqtt5::types::ConnectOptions;
use serde_json::{Value, json};

use super::common::{BenchDbArgs, generate_record, wait_for_broker_ready};
use crate::cli_types::OutputFormat;
use crate::common::connect_with_timeout;

#[allow(
clippy::too_many_lines,
Expand All @@ -33,11 +33,11 @@ pub(crate) async fn cmd_bench_db_changefeed(

let sub_client_name = format!("bench-changefeed-sub-{}", uuid::Uuid::new_v4());
let sub_client = MqttClient::new(sub_client_name.clone());
connect_client(&sub_client, &sub_client_name, &args).await?;
connect_with_timeout(&sub_client, &sub_client_name, &args.conn).await?;

let pub_client_name = format!("bench-changefeed-pub-{}", uuid::Uuid::new_v4());
let pub_client = MqttClient::new(pub_client_name.clone());
connect_client(&pub_client, &pub_client_name, &args).await?;
connect_with_timeout(&pub_client, &pub_client_name, &args.conn).await?;

let write_sent_ns: Arc<std::sync::Mutex<HashMap<String, u64>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
Expand Down Expand Up @@ -182,23 +182,6 @@ pub(crate) async fn cmd_bench_db_changefeed(
Ok(())
}

async fn connect_client(
client: &MqttClient,
client_id: &str,
args: &BenchDbArgs,
) -> Result<(), Box<dyn std::error::Error>> {
if args.conn.insecure {
client.set_insecure_tls(true).await;
}
if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) {
let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?;
} else {
client.connect(&args.conn.broker).await?;
}
Ok(())
}

fn extract_event_id(payload: &Value) -> Option<String> {
if let Some(s) = payload.get("id").and_then(Value::as_str) {
return Some(s.to_string());
Expand Down
32 changes: 7 additions & 25 deletions crates/mqdb-cli/src/commands/bench/db_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use mqtt5::client::MqttClient;
use mqtt5::types::{ConnectOptions, Message, PublishOptions, PublishProperties};
use mqtt5::types::{Message, PublishOptions, PublishProperties};
use serde_json::{Value, json};

use super::common::{BenchDbArgs, DbBenchMetrics, DbOp, generate_record, wait_for_broker_ready};
use super::db_async::cmd_bench_db_async;
use crate::cli_types::OutputFormat;
use crate::common::connect_with_timeout;

type PendingMap = Arc<std::sync::Mutex<HashMap<u64, flume::Sender<Option<String>>>>>;

Expand Down Expand Up @@ -148,13 +149,7 @@ pub(crate) async fn cmd_bench_db(args: BenchDbArgs) -> Result<(), Box<dyn std::e
println!("Seeding {} records...", args.seed);
let seeder_id = "bench-db-seeder".to_string();
let client = MqttClient::new(seeder_id.clone());
if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) {
let opts =
ConnectOptions::new(seeder_id.clone()).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?;
} else {
client.connect(&args.conn.broker).await?;
}
connect_with_timeout(&client, &seeder_id, &args.conn).await?;

let router = ResponseRouter::new(&seeder_id);
client
Expand Down Expand Up @@ -209,14 +204,7 @@ pub(crate) async fn cmd_bench_db(args: BenchDbArgs) -> Result<(), Box<dyn std::e
let client_name = format!("bench-db-{client_id}");
let client = MqttClient::new(client_name.clone());

if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) {
let opts = ConnectOptions::new(client_name.clone())
.with_credentials(user.clone(), pass.clone());
if let Err(e) = Box::pin(client.connect_with_options(&conn.broker, opts)).await {
eprintln!("Client {client_id} connect failed: {e}");
return;
}
} else if let Err(e) = client.connect(&conn.broker).await {
if let Err(e) = connect_with_timeout(&client, &client_name, &conn).await {
eprintln!("Client {client_id} connect failed: {e}");
return;
}
Expand Down Expand Up @@ -401,15 +389,9 @@ pub(crate) async fn cmd_bench_db(args: BenchDbArgs) -> Result<(), Box<dyn std::e
if !ids_to_cleanup.is_empty() {
let cleanup_id = "bench-cleanup".to_string();
let client = MqttClient::new(cleanup_id.clone());
let connected = if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) {
let opts =
ConnectOptions::new(cleanup_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&args.conn.broker, opts))
.await
.is_ok()
} else {
client.connect(&args.conn.broker).await.is_ok()
};
let connected = connect_with_timeout(&client, &cleanup_id, &args.conn)
.await
.is_ok();
if connected {
for id in &ids_to_cleanup {
let topic = format!("$DB/{}/{id}/delete", args.entity);
Expand Down
30 changes: 7 additions & 23 deletions crates/mqdb-cli/src/commands/bench/db_unique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use mqtt5::client::MqttClient;
use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties};
use mqtt5::types::{PublishOptions, PublishProperties};
use serde_json::{Value, json};

use super::common::{BenchDbArgs, wait_for_broker_ready};
use crate::cli_types::OutputFormat;
use crate::common::connect_with_timeout;

const UNIQUE_FIELD_NAME: &str = "bench_unique_field";
const UNIQUE_FIELD_VALUE: &str = "contested-value";
Expand Down Expand Up @@ -57,19 +58,10 @@ pub(crate) async fn cmd_bench_db_unique(
let handle = tokio::spawn(async move {
let client_id = format!("bench-unique-{client_idx}-{}", uuid::Uuid::new_v4());
let client = MqttClient::new(client_id.clone());
if conn.insecure {
client.set_insecure_tls(true).await;
}
if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) {
let opts = ConnectOptions::new(client_id.clone())
.with_credentials(user.clone(), pass.clone());
if Box::pin(client.connect_with_options(&conn.broker, opts))
.await
.is_err()
{
return;
}
} else if client.connect(&conn.broker).await.is_err() {
if connect_with_timeout(&client, &client_id, &conn)
.await
.is_err()
{
return;
}

Expand Down Expand Up @@ -208,15 +200,7 @@ pub(crate) async fn cmd_bench_db_unique(
async fn register_unique_constraint(args: &BenchDbArgs) -> Result<(), Box<dyn std::error::Error>> {
let client_id = format!("bench-unique-admin-{}", uuid::Uuid::new_v4());
let client = MqttClient::new(client_id.clone());
if args.conn.insecure {
client.set_insecure_tls(true).await;
}
if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) {
let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone());
Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?;
} else {
client.connect(&args.conn.broker).await?;
}
connect_with_timeout(&client, &client_id, &args.conn).await?;

let response_topic = format!("bench-unique-admin/resp/{}", uuid::Uuid::new_v4());
let (tx, rx) = flume::bounded::<Vec<u8>>(1);
Expand Down
44 changes: 38 additions & 6 deletions crates/mqdb-cli/src/commands/bench/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,25 @@ pub(crate) async fn cmd_bench_pubsub(
opts
};

if let Err(e) = Box::pin(client.connect_with_options(&broker, opts)).await {
eprintln!("Subscriber {sub_id} connect failed: {e}");
return;
let connect_timeout = Duration::from_secs(conn.timeout);
match tokio::time::timeout(
connect_timeout,
Box::pin(client.connect_with_options(&broker, opts)),
)
.await
{
Err(_) => {
eprintln!(
"Subscriber {sub_id} connect to {broker} timed out after {}s",
conn.timeout
);
return;
}
Ok(Err(e)) => {
eprintln!("Subscriber {sub_id} connect failed: {e}");
return;
}
Ok(Ok(_)) => {}
}

if use_wildcard && topic_count > 1 {
Expand Down Expand Up @@ -246,9 +262,25 @@ pub(crate) async fn cmd_bench_pubsub(
opts
};

if let Err(e) = Box::pin(client.connect_with_options(&broker, opts)).await {
eprintln!("Publisher {pub_id} connect failed: {e}");
return;
let connect_timeout = Duration::from_secs(conn.timeout);
match tokio::time::timeout(
connect_timeout,
Box::pin(client.connect_with_options(&broker, opts)),
)
.await
{
Err(_) => {
eprintln!(
"Publisher {pub_id} connect to {broker} timed out after {}s",
conn.timeout
);
return;
}
Ok(Err(e)) => {
eprintln!("Publisher {pub_id} connect failed: {e}");
return;
}
Ok(Ok(_)) => {}
}

let mut topic_idx: usize = 0;
Expand Down
Loading
Loading