Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ enum ProbingCommands {
#[arg(help = "Measurement ID returned by 'send'")]
id: String,
},
#[command(about = "Cancel a stuck/in-progress measurement by ID")]
Cancel {
#[arg(help = "Measurement ID to cancel")]
id: String,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -177,6 +182,7 @@ async fn handle_probing(command: ProbingCommands) -> anyhow::Result<()> {
ProbingCommands::Results { src_ip, since, until } => probing::results(src_ip, since, until).await,
ProbingCommands::Measurements { limit } => probing::measurements(limit).await,
ProbingCommands::MeasurementStatus { id } => probing::measurement_status(&id).await,
ProbingCommands::Cancel { id } => probing::cancel(&id).await,
}
}

Expand Down
53 changes: 49 additions & 4 deletions src/probing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ async fn query_clickhouse(sql: &str) -> anyhow::Result<Vec<serde_json::Value>> {
.collect()
}

/// Human label for a measurement's terminal state (cancelled takes precedence).
fn measurement_label(cancelled: bool, complete: bool) -> &'static str {
if cancelled {
"cancelled"
} else if complete {
"complete"
} else {
"in progress"
}
}

pub async fn measurements(limit: u32) -> anyhow::Result<()> {
anyhow::ensure!((1..=100).contains(&limit), "--limit must be between 1 and 100");

Expand All @@ -279,6 +290,8 @@ pub async fn measurements(limit: u32) -> anyhow::Result<()> {
total_expected_probes: i64,
total_sent_probes: i64,
measurement_complete: bool,
#[serde(default)]
measurement_cancelled: bool,
started_at: String,
}

Expand All @@ -295,7 +308,7 @@ pub async fn measurements(limit: u32) -> anyhow::Result<()> {
}

let rows: Vec<Vec<String>> = measurements.iter().map(|m| {
let status = if m.measurement_complete { "complete" } else { "in progress" };
let status = measurement_label(m.measurement_cancelled, m.measurement_complete);
vec![
m.measurement_id.clone(),
m.started_at.clone(),
Expand All @@ -318,6 +331,8 @@ pub async fn measurement_status(id: &str) -> anyhow::Result<()> {
expected_probes: i64,
sent_probes: i64,
is_complete: bool,
#[serde(default)]
cancelled: bool,
}

#[derive(serde::Deserialize)]
Expand All @@ -328,6 +343,8 @@ pub async fn measurement_status(id: &str) -> anyhow::Result<()> {
total_expected_probes: i64,
total_sent_probes: i64,
measurement_complete: bool,
#[serde(default)]
measurement_cancelled: bool,
agents: Vec<AgentStatus>,
}

Expand All @@ -339,7 +356,7 @@ pub async fn measurement_status(id: &str) -> anyhow::Result<()> {
// only the per-agent table below, so the output stays a single valid block
// (one CSV header / one JSON value) instead of a summary followed by a table.
if output::is_text() {
let overall = if status.measurement_complete { "complete" } else { "in progress" };
let overall = measurement_label(status.measurement_cancelled, status.measurement_complete);
output::section("measurement");
output::kv(&[
("id", &status.measurement_id),
Expand All @@ -351,19 +368,47 @@ pub async fn measurement_status(id: &str) -> anyhow::Result<()> {

if !status.agents.is_empty() {
let rows: Vec<Vec<String>> = status.agents.iter().map(|a| {
let done = if a.is_complete { "yes" } else { "no" };
let done = if a.cancelled { "cancelled" } else if a.is_complete { "yes" } else { "no" };
vec![
a.agent_id.clone(),
format!("{}/{}", a.sent_probes, a.expected_probes),
done.to_string(),
]
}).collect();
output::table(&["agent", "probes sent/expected", "complete"], &rows);
output::table(&["agent", "probes sent/expected", "status"], &rows);
}

Ok(())
}

pub async fn cancel(id: &str) -> anyhow::Result<()> {
#[derive(Deserialize)]
struct CancelResponse {
cancelled: bool,
agents_cancelled: u64,
message: String,
}

let resp: CancelResponse = api::ApiClient::new_saimiris()
.post(&format!("/api/measurement/{id}/cancel"), &serde_json::json!({}))
.await?;

if resp.cancelled {
output::success(&resp.message);
} else {
output::info(&resp.message);
}
output::kv(&[
("id", id),
("cancelled", &resp.cancelled.to_string()),
("agents_cancelled", &resp.agents_cancelled.to_string()),
("message", &resp.message),
]);
output::hint(&format!("nxthdr probing measurement-status {id}"));

Ok(())
}

/// Generate 48 random bits to use as the host part of a /80 source address.
/// All agents in a measurement share the same value so the replies are
/// identifiable as a group without any server-side state.
Expand Down