Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/bin/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ impl Settings for ServiceSettings {
}

fn default_cors_origins() -> Vec<String> {
vec![
"http://localhost:3000".to_string(),
"http://localhost:3001".to_string(),
]
vec!["http://localhost:8080".to_string()]
}

fn default_wg_network() -> IpAddrMask {
Expand Down
53 changes: 53 additions & 0 deletions src/bin/api/http/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,63 @@
use crate::http::StatusCode;
use chrono::Utc;
use futures::{SinkExt, StreamExt};
use std::collections::BTreeMap;
use std::sync::Arc;

use fcore::MetricStorage;

/// Debug endpoint
pub async fn debug_metrics_handler(
metrics: Arc<MetricStorage>,
) -> Result<impl warp::Reply, warp::Rejection> {
let mut debug_info = serde_json::json!({
"total_nodes": metrics.inner.len(),
"nodes": []
});

let nodes_array = debug_info.as_object_mut().unwrap();
let mut nodes_list = Vec::new();

for node_ref in metrics.inner.iter() {
let node_id = node_ref.key();
let node_metrics = node_ref.value();

let mut metrics_list = Vec::new();

for metric_entry in node_metrics.iter() {
let series_hash = metric_entry.key();
let points = metric_entry.value();

let (name, tags) = match metrics.metadata.get(series_hash) {
Some(meta) => (meta.0.clone(), meta.1.clone()),
None => ("unknown".to_string(), BTreeMap::new()),
};

metrics_list.push(serde_json::json!({
"hash": series_hash.to_string(),
"name": name,
"tags": tags,
"points_count": points.len(),
"last_value": points.back().map(|p| p.value),
"last_timestamp": points.back().map(|p| p.timestamp),
}));
}

nodes_list.push(serde_json::json!({
"node_id": node_id,
"metrics_count": metrics_list.len(),
"metrics": metrics_list,
}));
}

nodes_array.insert("nodes".to_string(), serde_json::Value::Array(nodes_list));

Ok(warp::reply::with_status(
warp::reply::json(&debug_info),
StatusCode::OK,
))
}

pub async fn handle_ws_client(
socket: warp::ws::WebSocket,
node_id: uuid::Uuid,
Expand Down
53 changes: 42 additions & 11 deletions src/bin/api/http/handlers/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use fcore::{

use super::super::{
super::sync::{tasks::SyncOp, MemSync},
param::{NodeIdParam, NodesQueryParams},
param::NodesQueryParams,
request::NodeRequest,
};

Expand Down Expand Up @@ -133,10 +133,6 @@ where
(val.0.clone(), val.1.clone())
})?;

if !(name.starts_with("sys.") || name.starts_with("net.")) {
return None;
}

Some(NodeMetricInfo {
key: series_hash.to_string(),
name,
Expand Down Expand Up @@ -179,11 +175,11 @@ where
}
}

/// Get of a node handler
// GET /node?id=
/// Get single node handler - ИСПРАВЛЕНАЯ ВЕРСИЯ
pub async fn get_node_handler<N, C, S>(
node_param: NodeIdParam,
node_id: uuid::Uuid,
memory: MemSync<N, C, S>,
metrics: Arc<MetricStorage>, // ДОБАВЛЯЕМ metrics параметр!
) -> Result<impl warp::Reply, warp::Rejection>
where
N: NodeStorageOperations + Sync + Send + Clone + 'static,
Expand All @@ -199,12 +195,47 @@ where
{
let mem = memory.memory.read().await;

if let Some(node) = mem.nodes.get_by_id(&node_param.id) {
if let Some(node) = mem.nodes.get_by_id(&node_id) {
let mut res = node.as_node_response();

res.metrics = if let Some(node_metrics_map) = metrics.inner.get(&node_id) {
node_metrics_map
.iter()
.filter_map(|entry| {
let series_hash = entry.key();
let points = entry.value();

if points.is_empty() {
return None;
}

let (name, tags) = match metrics.metadata.get(series_hash) {
Some(meta) => {
let val = meta.value();
(val.0.clone(), val.1.clone())
}
None => return None,
};

Some(NodeMetricInfo {
key: series_hash.to_string(),
name,
tags,
})
})
.collect()
} else {
vec![]
};

tracing::info!("Node {} has {} metrics", node_id, res.metrics.len());

let response = ResponseMessage::<Option<NodeResponse>> {
status: StatusCode::OK.as_u16(),
message: format!("Node {}", node_param.id),
response: Some(node.as_node_response()),
message: format!("Node {} with {} metrics", node_id, res.metrics.len()),
response: Some(res),
};

Ok(warp::reply::with_status(
warp::reply::json(&response),
StatusCode::OK,
Expand Down
6 changes: 6 additions & 0 deletions src/bin/api/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ struct JsonError(String);
impl reject::Reject for JsonError {}

pub async fn rejection(reject: Rejection) -> Result<impl Reply, Rejection> {
tracing::debug!("[REJECTION] Request rejected: {:?}", reject);
if reject.find::<MethodError>().is_some() {
tracing::debug!("[REJECTION] Reason: Method Not Allowed");
let error_response = warp::reply::json(&serde_json::json!({
"error": "Method Not Allowed"
}));
Expand All @@ -23,6 +25,7 @@ pub async fn rejection(reject: Rejection) -> Result<impl Reply, Rejection> {
StatusCode::METHOD_NOT_ALLOWED,
))
} else if reject.find::<AuthError>().is_some() {
tracing::debug!("[REJECTION] Reason UNAUTHORIZED");
let error_response = warp::reply::json(&serde_json::json!({
"error": "UNAUTHORIZED"
}));
Expand All @@ -31,6 +34,8 @@ pub async fn rejection(reject: Rejection) -> Result<impl Reply, Rejection> {
StatusCode::UNAUTHORIZED,
))
} else if let Some(err) = reject.find::<JsonError>() {
tracing::debug!("[REJECTION] Reason BAD_REQUEST");

let error_response = warp::reply::json(&serde_json::json!({
"error": err.0
}));
Expand All @@ -39,6 +44,7 @@ pub async fn rejection(reject: Rejection) -> Result<impl Reply, Rejection> {
StatusCode::BAD_REQUEST,
))
} else if reject.is_not_found() {
tracing::debug!("[REJECTION] Reason Not_Found");
let error_response = warp::reply::json(&serde_json::json!({
"error": "Not Found"
}));
Expand Down
4 changes: 0 additions & 4 deletions src/bin/api/http/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ pub struct SubIdQueryParam {
pub struct NodesQueryParams {
pub env: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NodeIdParam {
pub id: uuid::Uuid,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ConnQueryParam {
Expand Down
16 changes: 10 additions & 6 deletions src/bin/api/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
use warp::Filter;

use fcore::{
Expand Down Expand Up @@ -84,12 +85,10 @@ where
.and(with_sync(self.sync.clone()))
.and_then(post_node_handler);

let get_node_route = warp::get()
.and(warp::path("node"))
.and(warp::path::end())
.and(auth.clone())
.and(warp::query::<NodeIdParam>())
let get_node_route = warp::path!("node" / Uuid)
.and(warp::get())
.and(with_sync(self.sync.clone()))
.and(with_metrics(self.metrics.clone()))
.and_then(get_node_handler);

let get_subscription_route = warp::get()
Expand Down Expand Up @@ -215,7 +214,6 @@ where
.and(with_i64(params.trial_limit_bytes))
.and_then(post_trial_handler);

use uuid::Uuid;
let ws_all_metrics_route = warp::path!("metrics" / "all" / Uuid / u64 / "ws")
.and(warp::ws())
.and(with_metrics(self.metrics.clone()))
Expand Down Expand Up @@ -243,6 +241,11 @@ where
},
);

let debug_metrics_route = warp::path!("debug" / "metrics")
.and(warp::get())
.and(with_metrics(self.metrics.clone()))
.and_then(debug_metrics_handler);

let routes = get_healthcheck_route
// Subscription
.or(get_subscription_route)
Expand All @@ -269,6 +272,7 @@ where
// Metrics
.or(ws_all_metrics_route)
.or(ws_aggregate_route)
.or(debug_metrics_route)
.recover(rejection)
.with(cors);

Expand Down
9 changes: 8 additions & 1 deletion src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ impl MetricStorage {
}
}
pub fn insert_envelope(&self, e: MetricEnvelope) {
tracing::info!(
"Inserting metric: node={}, name={}, value={}, tags={:?}",
e.node_id,
e.name,
e.value,
e.tags
);

let key = Self::make_series_key(&e.name, &e.tags);

self.metadata.entry(key).or_insert_with(|| {
Expand Down Expand Up @@ -146,7 +154,6 @@ impl MetricStorage {
}
hasher.finish()
}

pub fn get_range(
&self,
node_id: &uuid::Uuid,
Expand Down
Loading