diff --git a/src/bin/api/config.rs b/src/bin/api/config.rs index 3e8e6148..015ad71d 100644 --- a/src/bin/api/config.rs +++ b/src/bin/api/config.rs @@ -19,10 +19,7 @@ impl Settings for ServiceSettings { } fn default_cors_origins() -> Vec { - vec![ - "http://localhost:3000".to_string(), - "http://localhost:3001".to_string(), - ] + vec!["http://localhost:8080".to_string()] } fn default_wg_network() -> IpAddrMask { diff --git a/src/bin/api/http/handlers/metrics.rs b/src/bin/api/http/handlers/metrics.rs index 5737d727..0dbb92c9 100644 --- a/src/bin/api/http/handlers/metrics.rs +++ b/src/bin/api/http/handlers/metrics.rs @@ -1,3 +1,4 @@ +use crate::http::StatusCode; use chrono::Utc; use futures::{SinkExt, StreamExt}; use std::collections::BTreeMap; @@ -5,6 +6,58 @@ use std::sync::Arc; use fcore::MetricStorage; +/// Debug endpoint +pub async fn debug_metrics_handler( + metrics: Arc, +) -> Result { + let mut debug_info = serde_json::json!({ + "total_nodes": metrics.inner.len(), + "nodes": [] + }); + + let nodes_array = debug_info.as_object_mut().unwrap(); + let mut nodes_list = Vec::new(); + + for node_ref in metrics.inner.iter() { + let node_id = node_ref.key(); + let node_metrics = node_ref.value(); + + let mut metrics_list = Vec::new(); + + for metric_entry in node_metrics.iter() { + let series_hash = metric_entry.key(); + let points = metric_entry.value(); + + let (name, tags) = match metrics.metadata.get(series_hash) { + Some(meta) => (meta.0.clone(), meta.1.clone()), + None => ("unknown".to_string(), BTreeMap::new()), + }; + + metrics_list.push(serde_json::json!({ + "hash": series_hash.to_string(), + "name": name, + "tags": tags, + "points_count": points.len(), + "last_value": points.back().map(|p| p.value), + "last_timestamp": points.back().map(|p| p.timestamp), + })); + } + + nodes_list.push(serde_json::json!({ + "node_id": node_id, + "metrics_count": metrics_list.len(), + "metrics": metrics_list, + })); + } + + nodes_array.insert("nodes".to_string(), serde_json::Value::Array(nodes_list)); + + Ok(warp::reply::with_status( + warp::reply::json(&debug_info), + StatusCode::OK, + )) +} + pub async fn handle_ws_client( socket: warp::ws::WebSocket, node_id: uuid::Uuid, diff --git a/src/bin/api/http/handlers/node.rs b/src/bin/api/http/handlers/node.rs index 02b57115..42400a1d 100644 --- a/src/bin/api/http/handlers/node.rs +++ b/src/bin/api/http/handlers/node.rs @@ -9,7 +9,7 @@ use fcore::{ use super::super::{ super::sync::{tasks::SyncOp, MemSync}, - param::{NodeIdParam, NodesQueryParams}, + param::NodesQueryParams, request::NodeRequest, }; @@ -133,10 +133,6 @@ where (val.0.clone(), val.1.clone()) })?; - if !(name.starts_with("sys.") || name.starts_with("net.")) { - return None; - } - Some(NodeMetricInfo { key: series_hash.to_string(), name, @@ -179,11 +175,11 @@ where } } -/// Get of a node handler -// GET /node?id= +/// Get single node handler - ИСПРАВЛЕНАЯ ВЕРСИЯ pub async fn get_node_handler( - node_param: NodeIdParam, + node_id: uuid::Uuid, memory: MemSync, + metrics: Arc, // ДОБАВЛЯЕМ metrics параметр! ) -> Result where N: NodeStorageOperations + Sync + Send + Clone + 'static, @@ -199,12 +195,47 @@ where { let mem = memory.memory.read().await; - if let Some(node) = mem.nodes.get_by_id(&node_param.id) { + if let Some(node) = mem.nodes.get_by_id(&node_id) { + let mut res = node.as_node_response(); + + res.metrics = if let Some(node_metrics_map) = metrics.inner.get(&node_id) { + node_metrics_map + .iter() + .filter_map(|entry| { + let series_hash = entry.key(); + let points = entry.value(); + + if points.is_empty() { + return None; + } + + let (name, tags) = match metrics.metadata.get(series_hash) { + Some(meta) => { + let val = meta.value(); + (val.0.clone(), val.1.clone()) + } + None => return None, + }; + + Some(NodeMetricInfo { + key: series_hash.to_string(), + name, + tags, + }) + }) + .collect() + } else { + vec![] + }; + + tracing::info!("Node {} has {} metrics", node_id, res.metrics.len()); + let response = ResponseMessage::> { status: StatusCode::OK.as_u16(), - message: format!("Node {}", node_param.id), - response: Some(node.as_node_response()), + message: format!("Node {} with {} metrics", node_id, res.metrics.len()), + response: Some(res), }; + Ok(warp::reply::with_status( warp::reply::json(&response), StatusCode::OK, diff --git a/src/bin/api/http/mod.rs b/src/bin/api/http/mod.rs index d7bf09fb..09d09ea6 100644 --- a/src/bin/api/http/mod.rs +++ b/src/bin/api/http/mod.rs @@ -14,7 +14,9 @@ struct JsonError(String); impl reject::Reject for JsonError {} pub async fn rejection(reject: Rejection) -> Result { + tracing::debug!("[REJECTION] Request rejected: {:?}", reject); if reject.find::().is_some() { + tracing::debug!("[REJECTION] Reason: Method Not Allowed"); let error_response = warp::reply::json(&serde_json::json!({ "error": "Method Not Allowed" })); @@ -23,6 +25,7 @@ pub async fn rejection(reject: Rejection) -> Result { StatusCode::METHOD_NOT_ALLOWED, )) } else if reject.find::().is_some() { + tracing::debug!("[REJECTION] Reason UNAUTHORIZED"); let error_response = warp::reply::json(&serde_json::json!({ "error": "UNAUTHORIZED" })); @@ -31,6 +34,8 @@ pub async fn rejection(reject: Rejection) -> Result { StatusCode::UNAUTHORIZED, )) } else if let Some(err) = reject.find::() { + tracing::debug!("[REJECTION] Reason BAD_REQUEST"); + let error_response = warp::reply::json(&serde_json::json!({ "error": err.0 })); @@ -39,6 +44,7 @@ pub async fn rejection(reject: Rejection) -> Result { StatusCode::BAD_REQUEST, )) } else if reject.is_not_found() { + tracing::debug!("[REJECTION] Reason Not_Found"); let error_response = warp::reply::json(&serde_json::json!({ "error": "Not Found" })); diff --git a/src/bin/api/http/param.rs b/src/bin/api/http/param.rs index 4a413653..a94a7c86 100644 --- a/src/bin/api/http/param.rs +++ b/src/bin/api/http/param.rs @@ -12,10 +12,6 @@ pub struct SubIdQueryParam { pub struct NodesQueryParams { pub env: Option, } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct NodeIdParam { - pub id: uuid::Uuid, -} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ConnQueryParam { diff --git a/src/bin/api/http/routes.rs b/src/bin/api/http/routes.rs index 8df2ec3b..deba2d6d 100644 --- a/src/bin/api/http/routes.rs +++ b/src/bin/api/http/routes.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use std::sync::Arc; +use uuid::Uuid; use warp::Filter; use fcore::{ @@ -84,12 +85,10 @@ where .and(with_sync(self.sync.clone())) .and_then(post_node_handler); - let get_node_route = warp::get() - .and(warp::path("node")) - .and(warp::path::end()) - .and(auth.clone()) - .and(warp::query::()) + let get_node_route = warp::path!("node" / Uuid) + .and(warp::get()) .and(with_sync(self.sync.clone())) + .and(with_metrics(self.metrics.clone())) .and_then(get_node_handler); let get_subscription_route = warp::get() @@ -215,7 +214,6 @@ where .and(with_i64(params.trial_limit_bytes)) .and_then(post_trial_handler); - use uuid::Uuid; let ws_all_metrics_route = warp::path!("metrics" / "all" / Uuid / u64 / "ws") .and(warp::ws()) .and(with_metrics(self.metrics.clone())) @@ -243,6 +241,11 @@ where }, ); + let debug_metrics_route = warp::path!("debug" / "metrics") + .and(warp::get()) + .and(with_metrics(self.metrics.clone())) + .and_then(debug_metrics_handler); + let routes = get_healthcheck_route // Subscription .or(get_subscription_route) @@ -269,6 +272,7 @@ where // Metrics .or(ws_all_metrics_route) .or(ws_aggregate_route) + .or(debug_metrics_route) .recover(rejection) .with(cors); diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs index ae0922c8..02adeec6 100644 --- a/src/metrics/storage.rs +++ b/src/metrics/storage.rs @@ -97,6 +97,14 @@ impl MetricStorage { } } pub fn insert_envelope(&self, e: MetricEnvelope) { + tracing::info!( + "Inserting metric: node={}, name={}, value={}, tags={:?}", + e.node_id, + e.name, + e.value, + e.tags + ); + let key = Self::make_series_key(&e.name, &e.tags); self.metadata.entry(key).or_insert_with(|| { @@ -146,7 +154,6 @@ impl MetricStorage { } hasher.finish() } - pub fn get_range( &self, node_id: &uuid::Uuid,