diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ae90e1..70ac8dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file. Each entry lists the date and the crate versions that were released. +## 2026-05-06 — mqdb-cli 0.7.6 + +### Fixed + +- `--timeout` did not apply during the MQTT CONNECT handshake. `connect_client` in `crates/mqdb-cli/src/common.rs` only wrapped the request/response wait in `tokio::time::timeout`; the `MqttClient::connect[_with_options]` calls themselves had no timeout, so any command (`mqdb list`, `read`, `create`, `update`, `delete`, etc.) would hang indefinitely against a TCP listener that accepts the connection but never sends CONNACK (silent broker, half-open NAT, firewall drop after SYN-ACK). +- Extracted a shared `connect_with_timeout(client, client_id, conn)` helper in `common.rs` that wraps both `MqttClient::connect[_with_options]` calls in `tokio::time::timeout(Duration::from_secs(conn.timeout), …)` and surfaces `connect to {broker} timed out after {N}s` on expiry. The helper also honors `conn.insecure` for self-signed TLS — previously only the bench paths set this, the CRUD path silently skipped it. +- Routed every CLI bench/dev_bench connect through the new helper to close the same bug class for `mqdb bench db` (sync + async + cascade + unique + changefeed), `mqdb bench pubsub`, `mqdb dev bench` (db/pubsub/sub-pub), and the broker-readiness probes in both `bench/common.rs::wait_for_broker_ready` and `dev_bench/helpers.rs::wait_for_broker_ready`. Removed two now-redundant local `connect_client` helpers in `db_cascade.rs` and `db_changefeed.rs`. The `pubsub.rs` paths use custom `ConnectOptions` (clean-start, custom keep-alive) so their connect calls are wrapped inline with the same timeout pattern rather than going through the helper. +- Regression test `test_cli_connect_timeout_against_silent_listener` in `crates/mqdb-cli/tests/cli_test.rs` spawns a TCP listener that accepts the connection without speaking MQTT and asserts `mqdb list ... --timeout 2` exits within 5 seconds with a "timed out" error. Verified to fail on main (pre-fix exits at ~6s with "Connection reset by peer") and pass with the fix in place. + ## 2026-05-03 — mqdb-cluster 0.3.3 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 19c57c7..80b495c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1384,7 +1384,7 @@ dependencies = [ [[package]] name = "mqdb-cli" -version = "0.7.5" +version = "0.7.6" dependencies = [ "base64", "bebytes", diff --git a/crates/mqdb-cli/Cargo.toml b/crates/mqdb-cli/Cargo.toml index b2fb574..022f579 100644 --- a/crates/mqdb-cli/Cargo.toml +++ b/crates/mqdb-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqdb-cli" -version = "0.7.5" +version = "0.7.6" publish = false edition.workspace = true license = "AGPL-3.0-only" diff --git a/crates/mqdb-cli/src/commands/bench/common.rs b/crates/mqdb-cli/src/commands/bench/common.rs index 15c0c3d..98755b6 100644 --- a/crates/mqdb-cli/src/commands/bench/common.rs +++ b/crates/mqdb-cli/src/commands/bench/common.rs @@ -4,10 +4,11 @@ use std::time::Duration; use mqtt5::client::MqttClient; -use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties}; +use mqtt5::types::{PublishOptions, PublishProperties}; use serde_json::{Value, json}; use crate::cli_types::{ConnectionArgs, OutputFormat}; +use crate::common::connect_with_timeout; pub(crate) struct BenchDbArgs { pub(crate) operations: u64, @@ -129,17 +130,9 @@ pub(crate) async fn wait_for_broker_ready( let client_id = format!("bench-ready-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(&client_id); - if conn.insecure { - client.set_insecure_tls(true).await; - } - let connected = if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) { - let opts = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&conn.broker, opts)) - .await - .is_ok() - } else { - client.connect(&conn.broker).await.is_ok() - }; + let connected = connect_with_timeout(&client, &client_id, conn) + .await + .is_ok(); if connected { let response_topic = format!("bench-ready/{}", uuid::Uuid::new_v4()); diff --git a/crates/mqdb-cli/src/commands/bench/db_async.rs b/crates/mqdb-cli/src/commands/bench/db_async.rs index 619f93f..12f36ba 100644 --- a/crates/mqdb-cli/src/commands/bench/db_async.rs +++ b/crates/mqdb-cli/src/commands/bench/db_async.rs @@ -6,10 +6,11 @@ use std::time::Duration; use mqtt5::QoS; use mqtt5::client::MqttClient; -use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties}; +use mqtt5::types::{PublishOptions, PublishProperties}; use serde_json::{Value, json}; use crate::cli_types::OutputFormat; +use crate::common::connect_with_timeout; use super::common::{BenchDbArgs, DbOp, generate_record}; @@ -54,15 +55,7 @@ pub(super) async fn cmd_bench_db_async( let client_id = format!("bench-db-async-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(client_id.clone()); - if args.conn.insecure { - client.set_insecure_tls(true).await; - } - if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) { - let opts = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?; - } else { - client.connect(&args.conn.broker).await?; - } + connect_with_timeout(&client, &client_id, &args.conn).await?; let seeded_ids: Arc> = if matches!(op, DbOp::Get | DbOp::Update | DbOp::Delete) { let seed_count = args.seed.max(1000); diff --git a/crates/mqdb-cli/src/commands/bench/db_cascade.rs b/crates/mqdb-cli/src/commands/bench/db_cascade.rs index 95a9a87..58bb5a4 100644 --- a/crates/mqdb-cli/src/commands/bench/db_cascade.rs +++ b/crates/mqdb-cli/src/commands/bench/db_cascade.rs @@ -6,11 +6,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use mqtt5::client::MqttClient; -use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties}; +use mqtt5::types::{PublishOptions, PublishProperties}; use serde_json::{Value, json}; use super::common::{BenchDbArgs, wait_for_broker_ready}; use crate::cli_types::{ConnectionArgs, OutputFormat}; +use crate::common::connect_with_timeout; #[allow( clippy::too_many_lines, @@ -36,7 +37,7 @@ pub(crate) async fn cmd_bench_db_cascade( let setup_client_id = format!("bench-cascade-setup-{}", uuid::Uuid::new_v4()); let setup_client = MqttClient::new(setup_client_id.clone()); - connect_client(&setup_client, &setup_client_id, &args.conn).await?; + connect_with_timeout(&setup_client, &setup_client_id, &args.conn).await?; let delete_latencies_ns: Vec = Vec::new(); let propagation_latencies_ns: Vec = Vec::new(); @@ -72,7 +73,7 @@ pub(crate) async fn cmd_bench_db_cascade( let sub_client_id = format!("bench-cascade-sub-{run}-{}", uuid::Uuid::new_v4()); let sub_client = MqttClient::new(sub_client_id.clone()); - connect_client(&sub_client, &sub_client_id, &args.conn).await?; + connect_with_timeout(&sub_client, &sub_client_id, &args.conn).await?; let events_topic = format!("$DB/{child_entity}/events/#"); let pending_children: Arc>> = @@ -213,23 +214,6 @@ pub(crate) async fn cmd_bench_db_cascade( Ok(()) } -async fn connect_client( - client: &MqttClient, - client_id: &str, - conn: &ConnectionArgs, -) -> Result<(), Box> { - if conn.insecure { - client.set_insecure_tls(true).await; - } - if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) { - let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&conn.broker, opts)).await?; - } else { - client.connect(&conn.broker).await?; - } - Ok(()) -} - async fn register_fk_cascade( conn: &ConnectionArgs, parent_entity: &str, @@ -237,7 +221,7 @@ async fn register_fk_cascade( ) -> Result<(), Box> { let client_id = format!("bench-cascade-admin-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(client_id.clone()); - connect_client(&client, &client_id, conn).await?; + connect_with_timeout(&client, &client_id, conn).await?; let response_topic = format!("bench-cascade-admin/resp/{}", uuid::Uuid::new_v4()); let (tx, rx) = flume::bounded::>(4); diff --git a/crates/mqdb-cli/src/commands/bench/db_changefeed.rs b/crates/mqdb-cli/src/commands/bench/db_changefeed.rs index d75b6b6..b120506 100644 --- a/crates/mqdb-cli/src/commands/bench/db_changefeed.rs +++ b/crates/mqdb-cli/src/commands/bench/db_changefeed.rs @@ -7,11 +7,11 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use mqtt5::client::MqttClient; -use mqtt5::types::ConnectOptions; use serde_json::{Value, json}; use super::common::{BenchDbArgs, generate_record, wait_for_broker_ready}; use crate::cli_types::OutputFormat; +use crate::common::connect_with_timeout; #[allow( clippy::too_many_lines, @@ -33,11 +33,11 @@ pub(crate) async fn cmd_bench_db_changefeed( let sub_client_name = format!("bench-changefeed-sub-{}", uuid::Uuid::new_v4()); let sub_client = MqttClient::new(sub_client_name.clone()); - connect_client(&sub_client, &sub_client_name, &args).await?; + connect_with_timeout(&sub_client, &sub_client_name, &args.conn).await?; let pub_client_name = format!("bench-changefeed-pub-{}", uuid::Uuid::new_v4()); let pub_client = MqttClient::new(pub_client_name.clone()); - connect_client(&pub_client, &pub_client_name, &args).await?; + connect_with_timeout(&pub_client, &pub_client_name, &args.conn).await?; let write_sent_ns: Arc>> = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -182,23 +182,6 @@ pub(crate) async fn cmd_bench_db_changefeed( Ok(()) } -async fn connect_client( - client: &MqttClient, - client_id: &str, - args: &BenchDbArgs, -) -> Result<(), Box> { - if args.conn.insecure { - client.set_insecure_tls(true).await; - } - if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) { - let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?; - } else { - client.connect(&args.conn.broker).await?; - } - Ok(()) -} - fn extract_event_id(payload: &Value) -> Option { if let Some(s) = payload.get("id").and_then(Value::as_str) { return Some(s.to_string()); diff --git a/crates/mqdb-cli/src/commands/bench/db_sync.rs b/crates/mqdb-cli/src/commands/bench/db_sync.rs index 705a26c..0b08e8b 100644 --- a/crates/mqdb-cli/src/commands/bench/db_sync.rs +++ b/crates/mqdb-cli/src/commands/bench/db_sync.rs @@ -7,12 +7,13 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use mqtt5::client::MqttClient; -use mqtt5::types::{ConnectOptions, Message, PublishOptions, PublishProperties}; +use mqtt5::types::{Message, PublishOptions, PublishProperties}; use serde_json::{Value, json}; use super::common::{BenchDbArgs, DbBenchMetrics, DbOp, generate_record, wait_for_broker_ready}; use super::db_async::cmd_bench_db_async; use crate::cli_types::OutputFormat; +use crate::common::connect_with_timeout; type PendingMap = Arc>>>>; @@ -148,13 +149,7 @@ pub(crate) async fn cmd_bench_db(args: BenchDbArgs) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { let client_id = format!("bench-unique-admin-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(client_id.clone()); - if args.conn.insecure { - client.set_insecure_tls(true).await; - } - if let (Some(user), Some(pass)) = (&args.conn.user, &args.conn.pass) { - let opts = ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&args.conn.broker, opts)).await?; - } else { - client.connect(&args.conn.broker).await?; - } + connect_with_timeout(&client, &client_id, &args.conn).await?; let response_topic = format!("bench-unique-admin/resp/{}", uuid::Uuid::new_v4()); let (tx, rx) = flume::bounded::>(1); diff --git a/crates/mqdb-cli/src/commands/bench/pubsub.rs b/crates/mqdb-cli/src/commands/bench/pubsub.rs index 3deb841..b637ded 100644 --- a/crates/mqdb-cli/src/commands/bench/pubsub.rs +++ b/crates/mqdb-cli/src/commands/bench/pubsub.rs @@ -159,9 +159,25 @@ pub(crate) async fn cmd_bench_pubsub( opts }; - if let Err(e) = Box::pin(client.connect_with_options(&broker, opts)).await { - eprintln!("Subscriber {sub_id} connect failed: {e}"); - return; + let connect_timeout = Duration::from_secs(conn.timeout); + match tokio::time::timeout( + connect_timeout, + Box::pin(client.connect_with_options(&broker, opts)), + ) + .await + { + Err(_) => { + eprintln!( + "Subscriber {sub_id} connect to {broker} timed out after {}s", + conn.timeout + ); + return; + } + Ok(Err(e)) => { + eprintln!("Subscriber {sub_id} connect failed: {e}"); + return; + } + Ok(Ok(_)) => {} } if use_wildcard && topic_count > 1 { @@ -246,9 +262,25 @@ pub(crate) async fn cmd_bench_pubsub( opts }; - if let Err(e) = Box::pin(client.connect_with_options(&broker, opts)).await { - eprintln!("Publisher {pub_id} connect failed: {e}"); - return; + let connect_timeout = Duration::from_secs(conn.timeout); + match tokio::time::timeout( + connect_timeout, + Box::pin(client.connect_with_options(&broker, opts)), + ) + .await + { + Err(_) => { + eprintln!( + "Publisher {pub_id} connect to {broker} timed out after {}s", + conn.timeout + ); + return; + } + Ok(Err(e)) => { + eprintln!("Publisher {pub_id} connect failed: {e}"); + return; + } + Ok(Ok(_)) => {} } let mut topic_idx: usize = 0; diff --git a/crates/mqdb-cli/src/commands/dev_bench/helpers.rs b/crates/mqdb-cli/src/commands/dev_bench/helpers.rs index 3ea8993..0232225 100644 --- a/crates/mqdb-cli/src/commands/dev_bench/helpers.rs +++ b/crates/mqdb-cli/src/commands/dev_bench/helpers.rs @@ -5,9 +5,10 @@ use std::process::Command; use std::time::Duration; use mqtt5::client::MqttClient; -use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties}; +use mqtt5::types::{PublishOptions, PublishProperties}; use crate::cli_types::ConnectionArgs; +use crate::common::connect_with_timeout; pub(super) fn is_agent_running() -> bool { Command::new("pgrep") @@ -65,17 +66,9 @@ pub(super) async fn wait_for_broker_ready( let client_id = format!("bench-ready-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(&client_id); - if conn.insecure { - client.set_insecure_tls(true).await; - } - let connected = if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) { - let opts = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&conn.broker, opts)) - .await - .is_ok() - } else { - client.connect(&conn.broker).await.is_ok() - }; + let connected = connect_with_timeout(&client, &client_id, conn) + .await + .is_ok(); if connected { let response_topic = format!("bench-ready/{}", uuid::Uuid::new_v4()); diff --git a/crates/mqdb-cli/src/commands/dev_bench/runners.rs b/crates/mqdb-cli/src/commands/dev_bench/runners.rs index a8e8e0a..b89cea0 100644 --- a/crates/mqdb-cli/src/commands/dev_bench/runners.rs +++ b/crates/mqdb-cli/src/commands/dev_bench/runners.rs @@ -11,6 +11,7 @@ use mqtt5::types::{PublishOptions, PublishProperties}; use super::DevBenchResult; use super::helpers::{BenchMetrics, chrono_timestamp}; use crate::cli_types::{ConnectionArgs, DevBenchScenario}; +use crate::common::connect_with_timeout; #[allow( clippy::cast_precision_loss, @@ -44,7 +45,10 @@ pub(super) async fn run_pubsub_benchmark( let handle = tokio::spawn(async move { let client_id = format!("dev-bench-sub-{sub_id}"); let client = MqttClient::new(&client_id); - if client.connect(&conn.broker).await.is_err() { + if connect_with_timeout(&client, &client_id, &conn) + .await + .is_err() + { return; } @@ -92,7 +96,10 @@ pub(super) async fn run_pubsub_benchmark( let handle = tokio::spawn(async move { let client_id = format!("dev-bench-pub-{pub_id}"); let client = MqttClient::new(&client_id); - if client.connect(&conn.broker).await.is_err() { + if connect_with_timeout(&client, &client_id, &conn) + .await + .is_err() + { return; } @@ -174,7 +181,10 @@ pub(super) async fn run_db_benchmark( let handle = tokio::spawn(async move { let client_id = format!("dev-bench-db-{client_id_num}"); let client = MqttClient::new(&client_id); - if client.connect(&conn.broker).await.is_err() { + if connect_with_timeout(&client, &client_id, &conn) + .await + .is_err() + { return; } diff --git a/crates/mqdb-cli/src/common.rs b/crates/mqdb-cli/src/common.rs index 28018dd..c0a5427 100644 --- a/crates/mqdb-cli/src/common.rs +++ b/crates/mqdb-cli/src/common.rs @@ -7,19 +7,45 @@ use mqtt5::types::{ConnectOptions, PublishOptions, PublishProperties}; use serde_json::{Value, json}; use std::time::Duration; +pub(crate) async fn connect_with_timeout( + client: &MqttClient, + client_id: &str, + conn: &ConnectionArgs, +) -> Result<(), Box> { + if conn.insecure { + client.set_insecure_tls(true).await; + } + + let connect_timeout = Duration::from_secs(conn.timeout); + let connect_future = async { + if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) { + let options = + ConnectOptions::new(client_id).with_credentials(user.clone(), pass.clone()); + Box::pin(client.connect_with_options(&conn.broker, options)) + .await + .map(|_| ()) + } else { + client.connect(&conn.broker).await + } + }; + tokio::time::timeout(connect_timeout, connect_future) + .await + .map_err(|_| { + format!( + "connect to {} timed out after {}s", + conn.broker, conn.timeout + ) + })??; + + Ok(()) +} + pub(crate) async fn connect_client( conn: &ConnectionArgs, ) -> Result> { let client_id = format!("mqdb-cli-{}", uuid::Uuid::new_v4()); let client = MqttClient::new(&client_id); - - if let (Some(user), Some(pass)) = (&conn.user, &conn.pass) { - let options = ConnectOptions::new(&client_id).with_credentials(user.clone(), pass.clone()); - Box::pin(client.connect_with_options(&conn.broker, options)).await?; - } else { - client.connect(&conn.broker).await?; - } - + connect_with_timeout(&client, &client_id, conn).await?; Ok(client) } diff --git a/crates/mqdb-cli/tests/cli_test.rs b/crates/mqdb-cli/tests/cli_test.rs index 7ca89ba..073814f 100644 --- a/crates/mqdb-cli/tests/cli_test.rs +++ b/crates/mqdb-cli/tests/cli_test.rs @@ -212,6 +212,52 @@ async fn test_cli_update_and_delete() { handle.abort(); } +#[tokio::test] +async fn test_cli_connect_timeout_against_silent_listener() { + use std::net::TcpListener; + use std::time::Instant; + + let listener = TcpListener::bind("127.0.0.1:0").expect("bind silent listener"); + let port = listener.local_addr().unwrap().port(); + + let _detached = std::thread::spawn(move || { + if let Ok((stream, _)) = listener.accept() { + std::thread::sleep(std::time::Duration::from_secs(6)); + drop(stream); + } + }); + + let start = Instant::now(); + let (success, stdout, stderr) = run_mqdb(&[ + "list", + "users", + "--broker", + &format!("127.0.0.1:{port}"), + "--user", + "x", + "--pass", + "y", + "--timeout", + "2", + ]) + .await; + let elapsed = start.elapsed(); + + assert!( + !success, + "command must fail when broker never sends CONNACK: stdout={stdout}, stderr={stderr}", + ); + let combined = format!("{stdout}{stderr}"); + assert!( + combined.contains("timed out"), + "error message must mention timeout: stdout={stdout}, stderr={stderr}", + ); + assert!( + elapsed.as_secs_f64() < 5.0, + "must exit within timeout window, took {elapsed:?}", + ); +} + #[tokio::test] async fn test_cli_bench_db() { let port = next_test_port();