diff --git a/Examples/run-with-docker/docker-compose.yaml b/Examples/run-with-docker/docker-compose.yaml new file mode 100644 index 00000000..a77444cc --- /dev/null +++ b/Examples/run-with-docker/docker-compose.yaml @@ -0,0 +1,109 @@ +services: + cortexflow-metrics: + image: lorenzotettamanti/cortexflow-metrics:otel-test-15 + container_name: cortexflow-metrics + stop_signal: SIGINT + restart: unless-stopped + privileged: true + pid: host + init: true + command: ["/usr/local/bin/cortexflow-metrics"] + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 + - OTEL_EXPORTER_OTLP_PROTOCOL=grpc + volumes: + - type: bind + source: /sys/fs/bpf + target: /sys/fs/bpf + bind: + propagation: shared + - /lib/modules:/lib/modules:ro + - /sys/kernel/debug:/sys/kernel/debug + - /proc:/host/proc:ro + networks: + - cortexflow + + #cortexflow-identity: + # image: lorenzotettamanti/cortexflow-identity:0.1.2 + # container_name: cortexflow-identity + # stop_signal: SIGINT + # restart: unless-stopped + # privileged: true + # pid: host + # init: true + # command: ["/usr/local/bin/cortexflow-identity-service"] + # volumes: + # - type: bind + # source: /sys/fs/bpf + # target: /sys/fs/bpf + # bind: + # propagation: shared + # - /lib/modules:/lib/modules:ro + # - /sys/kernel/debug:/sys/kernel/debug + # - /proc:/host/proc:ro + # networks: + # - cortexflow + + otel-collector: + image: otel/opentelemetry-collector:0.95.0 + container_name: otel-collector + command: + - "--config=/conf/otel-collector-config.yaml" + ports: + - "4317:4317" + - "4318:4318" + - "8889:8889" + environment: + - GOMEMLIMIT=1600MiB + volumes: + - ./otel-collector-config.yaml:/conf/otel-collector-config.yaml:ro + networks: + - cortexflow + depends_on: + # - cortexflow-identity + - cortexflow-metrics + + prometheus: + image: prom/prometheus:v2.51.2 + container_name: prometheus + user: "65534:65534" + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.enable-lifecycle" + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + networks: + - cortexflow + depends_on: + - otel-collector + + grafana: + image: grafana/grafana:latest + container_name: grafana + user: "472:472" + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + - GF_AUTH_ANONYMOUS_ENABLED=false + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro + - grafana-data:/var/lib/grafana + networks: + - cortexflow + depends_on: + - prometheus + +networks: + cortexflow: + +volumes: + prometheus-data: + grafana-data: diff --git a/Examples/run-with-docker/grafana/dashboards/cortexbrain-dashboard.json b/Examples/run-with-docker/grafana/dashboards/cortexbrain-dashboard.json new file mode 100644 index 00000000..89e182fb --- /dev/null +++ b/Examples/run-with-docker/grafana/dashboards/cortexbrain-dashboard.json @@ -0,0 +1,605 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_packets_total", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Total packets", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 25, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_cpu_bytes_alloc", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Cpu bytes allocation", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 25, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_enter_mem_alloc", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Mem Alloc size", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_mem_alloc_events_total", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Mem alloc tot events", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 16 + }, + "id": 6, + "options": { + "displayMode": "gradient", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_sched_stat_runtime", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Scheduler Runtime (us)", + "transparent": true, + "type": "bargauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "percentage", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 16 + }, + "id": 3, + "options": { + "displayMode": "gradient", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "10.4.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "editorMode": "builder", + "expr": "cortexbrain_cpu_bytes_alloc", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Timestamp microseconds histograms", + "transparent": true, + "type": "bargauge" + } + ], + "preload": false, + "refresh": "", + "schemaVersion": 39, + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "prometheus" + }, + "hide": 0, + "includeAll": false, + "label": "datasource", + "multi": false, + "name": "datasource", + "options": [], + "query": "prometheus", + "queryValue": "", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + } + ] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + }, + "timezone": "browser", + "title": "CortexBrain Dashboard", + "uid": "cortexbrain-main", + "version": 1 +} diff --git a/Examples/run-with-docker/grafana/provisioning/dashboards/dashboards.yml b/Examples/run-with-docker/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 00000000..0bcf3d81 --- /dev/null +++ b/Examples/run-with-docker/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards diff --git a/Examples/run-with-docker/grafana/provisioning/datasources/prometheus.yml b/Examples/run-with-docker/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 00000000..bb009bb2 --- /dev/null +++ b/Examples/run-with-docker/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,9 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false diff --git a/Examples/run-with-docker/otel-collector-config.yaml b/Examples/run-with-docker/otel-collector-config.yaml new file mode 100644 index 00000000..91cae017 --- /dev/null +++ b/Examples/run-with-docker/otel-collector-config.yaml @@ -0,0 +1,34 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + memory_limiter: + limit_mib: 1500 + spike_limit_mib: 512 + check_interval: 5s + +exporters: + logging: {} + prometheus: + endpoint: 0.0.0.0:8889 + namespace: cortexbrain + +service: + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter] + exporters: [logging] + logs: + receivers: [otlp] + processors: [memory_limiter] + exporters: [logging] + metrics: + receivers: [otlp] + processors: [memory_limiter] + exporters: [logging, prometheus] diff --git a/Examples/run-with-docker/prometheus.yml b/Examples/run-with-docker/prometheus.yml new file mode 100644 index 00000000..faeda702 --- /dev/null +++ b/Examples/run-with-docker/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: "otel-collector" + static_configs: + - targets: ["otel-collector:8889"] diff --git a/core/common/src/buffer_type.rs b/core/common/src/buffer_type.rs index 45d82c81..7124e55f 100644 --- a/core/common/src/buffer_type.rs +++ b/core/common/src/buffer_type.rs @@ -128,6 +128,61 @@ pub struct TimeStampMetrics { } #[cfg(feature = "monitoring-structs")] unsafe impl aya::Pod for TimeStampMetrics {} +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct CpuFrequency { + //pub cpu_id: u32, + //pub cpu_freq: u32, + pub bytes_alloc: u32, + pub pid: u32, + pub command: [u8; 16], +} +unsafe impl aya::Pod for CpuFrequency {} + +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct MemAlloc { + pub tgid: u32, + pub length: u64, + pub addr: u64, + pub command: [u8; TASK_COMM_LEN], +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for MemAlloc {} + +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct SchedStatWait { + pub tgid: u32, + pub delay: u64, + pub command: [u8; TASK_COMM_LEN], +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for SchedStatWait {} + +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct SchedStatRuntime { + pub tgid: u32, + pub runtime: u64, + pub command: [u8; TASK_COMM_LEN], +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for SchedStatRuntime {} + +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct CpuIdle { + pub cpu_id: u32, + pub state: u32, +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for CpuIdle {} // docs: // This function perform a byte swap from little-endian to big-endian @@ -156,10 +211,18 @@ pub enum BufferType { NetworkMetrics, #[cfg(feature = "monitoring-structs")] TimeStampMetrics, + #[cfg(feature = "monitoring-structs")] + CpuFrequency, + #[cfg(feature = "monitoring-structs")] + MemAlloc, + #[cfg(feature = "monitoring-structs")] + SchedStatWait, + #[cfg(feature = "monitoring-structs")] + SchedStatRuntime, + #[cfg(feature = "monitoring-structs")] + CpuIdle, } -// IDEA: this is an experimental implementation to centralize buffer reading logic -// TODO: add variant for cortexflow API exporter #[cfg(feature = "buffer-reader")] impl BufferType { #[cfg(feature = "network-structs")] @@ -476,6 +539,231 @@ impl BufferType { } } } + + #[cfg(feature = "monitoring-structs")] + pub async fn read_cpu_frequency( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted Cpu Frequency Metrics data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let cpu_freq_metrics: CpuFrequency = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_cpu_bytes_alloc(&cpu_freq_metrics), + _ => continue, + } + + //let cpu_id = cpu_freq_metrics.cpu_id; + //let cpu_freq = cpu_freq_metrics.cpu_freq; + let bytes_alloc = cpu_freq_metrics.bytes_alloc; + //info!( + // "Cpu id: {} Cpu frequency: {} Bytes alloc: {}", + // cpu_id, cpu_freq, bytes_alloc + //); + let pid = cpu_freq_metrics.pid; + let command = cpu_freq_metrics.command; + info!( + "Cpu Bytes alloc: {} pid : {} command: {:?}", + bytes_alloc, pid, command + ); + } + } + } + + #[cfg(feature = "monitoring-structs")] + pub async fn read_mem_alloc( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted MemAlloc data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let mem_alloc: MemAlloc = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_enter_mem_alloc(&mem_alloc), + _ => continue, + } + + let tgid = mem_alloc.tgid; + let command = String::from_utf8_lossy(&mem_alloc.command); + let addr = mem_alloc.addr; + let length = mem_alloc.length; + + info!( + "MemAlloc - tgid: {}, command: {}, addr: {}, length: {}", + tgid, command, addr, length + ); + } + } + } + + #[cfg(feature = "monitoring-structs")] + pub async fn read_sched_stat_wait( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted SchedStatWait data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let sched_stat_wait: SchedStatWait = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_sched_stat_wait(&sched_stat_wait), + _ => continue, + } + + let tgid = sched_stat_wait.tgid; + let command = String::from_utf8_lossy(&sched_stat_wait.command); + let delay = sched_stat_wait.delay; + + info!( + "SchedStatWait - tgid: {}, command: {}, delay: {}", + tgid, command, delay + ); + } + } + } + + #[cfg(feature = "monitoring-structs")] + pub async fn read_sched_stat_runtime( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted SchedStatRuntime data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let sched_stat_runtime: SchedStatRuntime = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_sched_stat_runtime(&sched_stat_runtime), + _ => continue, + } + + let tgid = sched_stat_runtime.tgid; + let command = String::from_utf8_lossy(&sched_stat_runtime.command); + let runtime = sched_stat_runtime.runtime; + + info!( + "SchedStatRuntime - tgid: {}, command: {}, runtime: {}", + tgid, command, runtime + ); + } + } + } + + #[cfg(feature = "monitoring-structs")] + pub async fn read_cpu_idle( + buffers: &mut [BytesMut], + tot_events: i32, + offset: i32, + exporter: &str, + metrics: Arc, + ) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted CpuIdle data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let cpu_idle: CpuIdle = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + + match exporter { + "otlp" => metrics.record_cpu_idle(&cpu_idle), + _ => continue, + } + + let cpu_id = cpu_idle.cpu_id; + let state = cpu_idle.state; + + info!( + "CpuIdle state changed - cpu_id: {}, state: {}", + cpu_id, state + ); + } + } + } } // docs: read buffer function: @@ -548,6 +836,63 @@ pub async fn read_perf_buffer>( ) .await } + #[cfg(feature = "monitoring-structs")] + BufferType::CpuFrequency => { + BufferType::read_cpu_frequency( + &mut buffers, + tot_events, + offset, + "otlp", + metrics.clone().expect("Metric required for CpuFrequency"), + ) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::MemAlloc => { + BufferType::read_mem_alloc( + &mut buffers, + tot_events, + offset, + "otlp", + metrics.clone().expect("Metric required for MemAlloc"), + ) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::SchedStatWait => { + BufferType::read_sched_stat_wait( + &mut buffers, + tot_events, + offset, + "otlp", + metrics.clone().expect("Metric required for SchedStatWait"), + ) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::SchedStatRuntime => { + BufferType::read_sched_stat_runtime( + &mut buffers, + tot_events, + offset, + "otlp", + metrics + .clone() + .expect("Metric required for SchedStatRuntime"), + ) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::CpuIdle => { + BufferType::read_cpu_idle( + &mut buffers, + tot_events, + offset, + "otlp", + metrics.clone().expect("Metric required for CpuIdle"), + ) + .await + } } } } @@ -572,6 +917,16 @@ pub enum BufferSize { NetworkMetricsEvents, #[cfg(feature = "monitoring-structs")] TimeMetricsEvents, + #[cfg(feature = "monitoring-structs")] + CpuFrequency, + #[cfg(feature = "monitoring-structs")] + MemAlloc, + #[cfg(feature = "monitoring-structs")] + SchedStatWait, + #[cfg(feature = "monitoring-structs")] + SchedStatRuntime, + #[cfg(feature = "monitoring-structs")] + CpuIdle, } #[cfg(feature = "buffer-reader")] impl BufferSize { @@ -587,6 +942,16 @@ impl BufferSize { BufferSize::NetworkMetricsEvents => std::mem::size_of::(), #[cfg(feature = "monitoring-structs")] BufferSize::TimeMetricsEvents => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::CpuFrequency => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::MemAlloc => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::SchedStatWait => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::SchedStatRuntime => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::CpuIdle => std::mem::size_of::(), } } pub fn set_buffer(&self) -> Vec { @@ -630,6 +995,31 @@ impl BufferSize { let capacity = self.get_size() * 1024; return vec![BytesMut::with_capacity(capacity); tot_cpu]; } + #[cfg(feature = "monitoring-structs")] + BufferSize::CpuFrequency => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::MemAlloc => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::SchedStatWait => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::SchedStatRuntime => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::CpuIdle => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } } } } diff --git a/core/common/src/otel_metrics.rs b/core/common/src/otel_metrics.rs index f123b3b4..79d08b88 100644 --- a/core/common/src/otel_metrics.rs +++ b/core/common/src/otel_metrics.rs @@ -11,7 +11,10 @@ //! extracted from the eBPF struct, allowing downstream collectors to group //! telemetry by process. -use crate::buffer_type::{NetworkMetrics, TimeStampMetrics}; +use crate::buffer_type::{ + CpuFrequency, CpuIdle, MemAlloc, NetworkMetrics, SchedStatRuntime, SchedStatWait, + TimeStampMetrics, +}; use opentelemetry::KeyValue; use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter}; pub struct Metrics { @@ -35,6 +38,27 @@ pub struct Metrics { /// Histogram of `ts_us` values seen in both `net_metrics` and /// `time_stamp_events`. pub ts_us: Histogram, + + /// Cpu bytes alloc total events + pub cpu_bytes_alloc_events_total: Counter, + + /// Cpu bytes allocation + pub cpu_bytes_alloc: Gauge, + + /// Total number of memory allocation (mmap) events processed. + pub mem_alloc_events_total: Counter, + + /// Observed bytes requested via mmap syscalls. + pub enter_mem_alloc: Gauge, + + /// Observed scheduler wait time in nanoseconds (sched_stat_wait). + pub sched_stat_wait: Gauge, + + /// Observed scheduler runtime in nanoseconds (sched_stat_runtime). + pub sched_stat_runtime: Gauge, + + /// Current CPU idle C-state per cpu_id, updated only on state change. + pub cpu_idle_state: Gauge, } impl Metrics { @@ -66,7 +90,7 @@ impl Metrics { // delta microseconds let delta_us = meter - .u64_histogram("cortexbrain_delta_us") + .u64_histogram("delta_us") .with_description("Distribution of delta_us values from timestamp events") .build(); @@ -76,6 +100,48 @@ impl Metrics { .with_description("Distribution of timestamp values from eBPF events") .build(); + // cpu bytes alloc total events + let cpu_bytes_alloc_events_total = meter + .u64_counter("bytes_alloc_events_total") + .with_description("Total bytes_alloc events occuring in the CPU") + .build(); + + // cpu bytes allocation + let cpu_bytes_alloc = meter + .i64_gauge("cpu_bytes_alloc") + .with_description("Cpu bytes allocation per event") + .build(); + + // memory allocation (mmap) events total + let mem_alloc_events_total = meter + .u64_counter("mem_alloc_events_total") + .with_description("Total number of memory allocation (mmap) events processed") + .build(); + + // bytes requested via mmap syscalls + let enter_mem_alloc = meter + .i64_gauge("enter_mem_alloc") + .with_description("Bytes requested via mmap syscalls") + .build(); + + // scheduler wait time in nanoseconds + let sched_stat_wait = meter + .i64_gauge("sched_stat_wait") + .with_description("Scheduler wait time in nanoseconds from sched_stat_wait") + .build(); + + // scheduler runtime in nanoseconds + let sched_stat_runtime = meter + .i64_gauge("sched_stat_runtime") + .with_description("Scheduler runtime in nanoseconds from sched_stat_runtime") + .build(); + + // current CPU idle C-state per cpu_id + let cpu_idle_state = meter + .i64_gauge("cpu_idle_state") + .with_description("Current CPU idle C-state per cpu_id, updated only on state change") + .build(); + Self { events_total, packets_total, @@ -83,6 +149,13 @@ impl Metrics { sk_err, delta_us, ts_us, + cpu_bytes_alloc, + cpu_bytes_alloc_events_total, + mem_alloc_events_total, + enter_mem_alloc, + sched_stat_wait, + sched_stat_runtime, + cpu_idle_state, } } @@ -130,4 +203,75 @@ impl Metrics { self.delta_us.record(m.delta_us, attrs); self.ts_us.record(m.ts_us, attrs); } + + pub fn record_cpu_bytes_alloc(&self, m: &CpuFrequency) { + let bytes_allocated = m.bytes_alloc; + let tgid = m.pid; // percpu tracepoints expose TGID in common_pid + let comm = String::from_utf8_lossy(&m.command); + let command = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", tgid as i64), + KeyValue::new("command", command), + ]; + self.cpu_bytes_alloc_events_total.add(1, attrs); + self.cpu_bytes_alloc.record(bytes_allocated as i64, attrs); + } + + /// Record a single [`MemAlloc`] event (mmap syscall). + /// + /// Increments the dedicated `mem_alloc_events_total` counter and records + /// the requested length in the `enter_mem_alloc` gauge. The shared + /// `events_total` counter is intentionally **not** incremented for these + /// events. + pub fn record_enter_mem_alloc(&self, m: &MemAlloc) { + let comm = String::from_utf8_lossy(&m.command); + let command = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", m.tgid as i64), + KeyValue::new("command", command), + ]; + + self.mem_alloc_events_total.add(1, attrs); + self.enter_mem_alloc.record(m.length as i64, attrs); + } + + /// Record a single [`SchedStatWait`] event. + /// + /// Records `delay` in the `sched_stat_wait` gauge. No shared or dedicated + /// counter is incremented, as requested. + pub fn record_sched_stat_wait(&self, m: &SchedStatWait) { + let comm = String::from_utf8_lossy(&m.command); + let command = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", m.tgid as i64), + KeyValue::new("command", command), + ]; + + self.sched_stat_wait.record(m.delay as i64, attrs); + } + + /// Record a single [`SchedStatRuntime`] event. + /// + /// Records `runtime` in the `sched_stat_runtime` gauge. No shared or + /// dedicated counter is incremented, as requested. + pub fn record_sched_stat_runtime(&self, m: &SchedStatRuntime) { + let comm = String::from_utf8_lossy(&m.command); + let command = comm.trim_end_matches('\0').to_string(); + let attrs = &[ + KeyValue::new("tgid", m.tgid as i64), + KeyValue::new("command", command), + ]; + + self.sched_stat_runtime.record(m.runtime as i64, attrs); + } + + /// Record a single [`CpuIdle`] event. + /// + /// Updates `cpu_idle_state` gauge to the latest C-state for the given + /// `cpu_id`. Events are only emitted by eBPF when the state changes. + pub fn record_cpu_idle(&self, m: &CpuIdle) { + let attrs = &[KeyValue::new("cpu_id", m.cpu_id as i64)]; + + self.cpu_idle_state.record(m.state as i64, attrs); + } } diff --git a/core/common/src/program_handlers.rs b/core/common/src/program_handlers.rs index 347be51f..fc3d6d51 100644 --- a/core/common/src/program_handlers.rs +++ b/core/common/src/program_handlers.rs @@ -1,4 +1,7 @@ -use aya::{Ebpf, programs::KProbe}; +use aya::{ + Ebpf, + programs::{KProbe, TracePoint}, +}; use std::convert::TryInto; use std::sync::{Arc, Mutex}; use tracing::{error, info}; @@ -48,3 +51,50 @@ pub fn load_program( Ok(()) } + +#[cfg(feature = "program-handlers")] +pub fn load_tracepoint_program( + bpf: Arc>, + program_name: &str, + tracepoint_type: &str, + tracepoint_symbol: &str, +) -> Result<(), anyhow::Error> { + let mut bpf_new = bpf + .lock() + .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; + + // Load and attach the eBPF program + let program: &mut TracePoint = bpf_new + .program_mut(program_name) + .ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))? + .try_into() + .map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?; + + // STEP 1: load program + + program + .load() + .map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?; + + // STEP 2: Attach the loaded program to kernel symbol + match program.attach(tracepoint_type, tracepoint_symbol) { + Ok(_) => info!( + "{} program attached successfully to tracepoint {}", + &program_name, &tracepoint_symbol + ), + Err(e) => { + error!( + "Error attaching {} program to tracepoint {}. Reason: {:?}", + &program_name, &tracepoint_symbol, e + ); + return Err(anyhow::anyhow!( + "Failed to attach program {} to tracepoint {}. Reason {:?}", + &program_name, + &tracepoint_symbol, + e + )); + } + }; + + Ok(()) +} diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index 804e9306..141cad6f 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -62,9 +62,34 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a .remove("net_metrics") .expect("Cannot create net_perf_buffer"); + let (_cpu_frequency_events_array, cpu_frequency_perf_buffer) = maps + .remove("cpu_frequency") + .expect("Cannot create cpu_frequency_perf_buffer"); + + let (_cpu_idle_array, cpu_idle_perf_buffer) = maps + .remove("cpu_idle") + .expect("Cannot create cpu_idle perf buffer"); + + let (_mem_alloc_array, mem_alloc_perf_buffer) = maps + .remove("mem_alloc") + .expect("Cannot create mem_alloc perf buffer"); + + let (_sched_stat_wait_array, sched_stat_wait_perf_buffer) = maps + .remove("sched_stat_wait") + .expect("Cannot create sched_stat_wait perf buffer"); + + let (_sched_stat_runtime_array, sched_stat_runtime_perf_buffer) = maps + .remove("sched_stat_runtime") + .expect("Cannot create sched_stat_runtime perf buffer"); + // Allocate byte-buffers sized for each structure type let net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer(); let time_stamp_events_buffers = BufferSize::TimeMetricsEvents.set_buffer(); + let cpu_frequency_events_buffers = BufferSize::CpuFrequency.set_buffer(); + let cpu_idle_buffers = BufferSize::CpuIdle.set_buffer(); + let mem_alloc_buffers = BufferSize::MemAlloc.set_buffer(); + let sched_stat_wait_buffers = BufferSize::SchedStatWait.set_buffer(); + let sched_stat_runtime_buffers = BufferSize::SchedStatRuntime.set_buffer(); let metrics = Arc::new(Metrics::new(&meter)); @@ -100,6 +125,69 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a }) }; + let cpu_frequency_metrics = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = cpu_frequency_perf_buffer; + let mut buffers = cpu_frequency_events_buffers; + tokio::spawn(async move { + read_perf_buffer( + array_buffers, + buffers, + BufferType::CpuFrequency, + Some(metrics), + ) + .await; + }) + }; + + let cpu_idle_metrics = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = cpu_idle_perf_buffer; + let mut buffers = cpu_idle_buffers; + tokio::spawn(async move { + read_perf_buffer(array_buffers, buffers, BufferType::CpuIdle, Some(metrics)).await; + }) + }; + + let mem_alloc_metrics = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = mem_alloc_perf_buffer; + let mut buffers = mem_alloc_buffers; + tokio::spawn(async move { + read_perf_buffer(array_buffers, buffers, BufferType::MemAlloc, Some(metrics)).await; + }) + }; + + let sched_stat_wait_metrics = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = sched_stat_wait_perf_buffer; + let mut buffers = sched_stat_wait_buffers; + tokio::spawn(async move { + read_perf_buffer( + array_buffers, + buffers, + BufferType::SchedStatWait, + Some(metrics), + ) + .await; + }) + }; + + let sched_stat_runtime_metrics = { + let metrics = Arc::clone(&metrics); + let mut array_buffers = sched_stat_runtime_perf_buffer; + let mut buffers = sched_stat_runtime_buffers; + tokio::spawn(async move { + read_perf_buffer( + array_buffers, + buffers, + BufferType::SchedStatRuntime, + Some(metrics), + ) + .await; + }) + }; + info!("Event listeners started, entering main loop..."); tokio::select! { @@ -115,6 +203,36 @@ pub async fn event_listener(bpf_maps: BpfMapsData, meter: Meter) -> Result<(), a } } + result = cpu_frequency_metrics => { + if let Err(e) = result { + error!("Cpu frequency events task failed: {:?}", e); + } + } + + result = cpu_idle_metrics => { + if let Err(e) = result { + error!("CpuIdle events task failed: {:?}", e); + } + } + + result = mem_alloc_metrics => { + if let Err(e) = result { + error!("MemAlloc events task failed: {:?}", e); + } + } + + result = sched_stat_wait_metrics => { + if let Err(e) = result { + error!("SchedStatWait events task failed: {:?}", e); + } + } + + result = sched_stat_runtime_metrics => { + if let Err(e) = result { + error!("SchedStatRuntime events task failed: {:?}", e); + } + } + _ = signal::ctrl_c() => { info!("Ctrl-C received, shutting down..."); } diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index 0211be68..bcf7de72 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -26,7 +26,7 @@ use cortexbrain_common::{ constants, logger::otlp_logger_init, map_handlers::{init_bpf_maps, map_pinner}, - program_handlers::load_program, + program_handlers::{load_program, load_tracepoint_program}, }; #[tokio::main] @@ -46,6 +46,11 @@ async fn main() -> Result<(), anyhow::Error> { let tcp_bpf = bpf.clone(); let tcp_rev_bpf = bpf.clone(); let tcp_v6_bpf = bpf.clone(); + let cpu_frequency = bpf.clone(); + let cpu_idle_bpf = bpf.clone(); + let mem_alloc_bpf = bpf.clone(); + let sched_stat_wait_bpf = bpf.clone(); + let sched_stat_runtime_bpf = bpf.clone(); info!("Running Ebpf logger"); info!("loading programs"); @@ -53,7 +58,15 @@ async fn main() -> Result<(), anyhow::Error> { let bpf_map_save_path = env::var(constants::PIN_MAP_PATH).context("PIN_MAP_PATH environment variable required")?; - let map_data = vec!["time_stamp_events".to_string(), "net_metrics".to_string()]; + let map_data = vec![ + "time_stamp_events".to_string(), + "net_metrics".to_string(), + "cpu_frequency".to_string(), + "cpu_idle".to_string(), + "mem_alloc".to_string(), + "sched_stat_wait".to_string(), + "sched_stat_runtime".to_string(), + ]; match init_bpf_maps(bpf.clone(), map_data) { Ok(bpf_maps) => { @@ -85,6 +98,51 @@ async fn main() -> Result<(), anyhow::Error> { .context( "An error occurred during the execution of load_program function", )?; + load_tracepoint_program( + cpu_frequency, + "trace_cpu_frequency", + "percpu", + "percpu_alloc_percpu", + ) + .context( + "An error occurred during the execution of load_program function", + )?; + load_tracepoint_program( + cpu_idle_bpf, + "trace_cpu_idle", + "power", + "cpu_idle", + ) + .context( + "An error occurred during the execution of load_program function", + )?; + load_tracepoint_program( + mem_alloc_bpf, + "trace_enter_mmap", + "syscalls", + "sys_enter_mmap", + ) + .context( + "An error occurred during the execution of load_program function", + )?; + load_tracepoint_program( + sched_stat_wait_bpf, + "trace_sched_stat_wait", + "sched", + "sched_stat_wait", + ) + .context( + "An error occurred during the execution of load_program function", + )?; + load_tracepoint_program( + sched_stat_runtime_bpf, + "trace_sched_stat_runtime", + "sched", + "sched_stat_runtime", + ) + .context( + "An error occurred during the execution of load_program function", + )?; } // Hand off to the async event consumer diff --git a/core/src/components/metrics_tracer/src/cpu.rs b/core/src/components/metrics_tracer/src/cpu.rs new file mode 100644 index 00000000..374ea36a --- /dev/null +++ b/core/src/components/metrics_tracer/src/cpu.rs @@ -0,0 +1,75 @@ +//tracepoint:power:cpu_frequency +//tracepoint:power:cpu_frequency_limits +//tracepoint:power:cpu_idle +//tracepoint:power:cpu_idle_miss +use aya_ebpf::{EbpfContext, programs::TracePointContext}; +use aya_log_ebpf::info; + +use crate::data_structures::{CPU_FREQUENCY, CPU_IDLE, CPU_IDLE_LAST_STATE, CpuFrequency, CpuIdle}; + +pub fn cpu_idle(ctx: TracePointContext) -> Result<(), i64> { + let state_offset = 8; + let cpu_id_offset = 12; + let state: u32 = unsafe { ctx.read_at(state_offset) }?; + let cpu_id: u32 = unsafe { ctx.read_at(cpu_id_offset) }?; + + let map_ptr = unsafe { &raw mut CPU_IDLE_LAST_STATE }; + + // skip the data when: + // - last_state is equal to the current state + // - last_state is equal to 4294967295 or -1. This codes means that the cpu is exiting from the current state and entering a new state + let emit = match unsafe { (*map_ptr).get(&cpu_id) } { + Some(last_state) if (*last_state == state) || (*last_state == 4294967295) => false, + _ => true, + }; + + if emit { + let _ = unsafe { (*map_ptr).insert(&cpu_id, &state, 0) }; + let event = CpuIdle { cpu_id, state }; + unsafe { CPU_IDLE.output(&ctx, &event, 0) }; + } + + info!(&ctx, "CPU idle: State: {} cpu_id: {}", state, cpu_id); + Ok(()) +} + +pub fn per_cpu_bytes_alloc(ctx: &TracePointContext) -> Result<((u32, u32, [u8; 16])), i64> { + let bytes_alloc_offset = 64; + let pid_offset = 4; + let bytes_alloc = unsafe { ctx.read_at(bytes_alloc_offset) }?; + let pid = unsafe { ctx.read_at(pid_offset) }?; + let command = ctx.command()?; + + //let cpu_freq_data = CpuFrequency { + // cpu_id, + // cpu_freq: state, + //}; + + //CPU_FREQUENCY.output(&ctx, &cpu_freq_data, 0); + + Ok((bytes_alloc, pid, command)) +} + +pub fn sched_stat_wait(ctx: &TracePointContext) -> Result<((u32, u64, [u8; 16])), i64> { + let pid_offset = 4; + let delay_offset = 16; + + let pid = unsafe { ctx.read_at(pid_offset) }?; + + let delay = unsafe { ctx.read_at(delay_offset) }?; + let command = ctx.command()?; + + Ok((pid, delay, command)) +} + +pub fn sched_stat_runtime(ctx: &TracePointContext) -> Result<((u32, u64, [u8; 16])), i64> { + let pid_offset = 4; + let runtime_offset = 16; + + let pid = unsafe { ctx.read_at(pid_offset) }?; + + let runtime = unsafe { ctx.read_at(runtime_offset) }?; + let command = ctx.command()?; + + Ok((pid, runtime, command)) +} diff --git a/core/src/components/metrics_tracer/src/data_structures.rs b/core/src/components/metrics_tracer/src/data_structures.rs index e9866a83..d74ef35a 100644 --- a/core/src/components/metrics_tracer/src/data_structures.rs +++ b/core/src/components/metrics_tracer/src/data_structures.rs @@ -1,22 +1,25 @@ -use aya_ebpf::{macros::map, maps::{LruPerCpuHashMap, HashMap, PerfEventArray}}; +use aya_ebpf::{ + macros::map, + maps::{HashMap, LruPerCpuHashMap, PerfEventArray}, +}; pub const TASK_COMM_LEN: usize = 16; -#[repr(C,packed)] +#[repr(C, packed)] pub struct NetworkMetrics { pub tgid: u32, pub comm: [u8; TASK_COMM_LEN], pub ts_us: u64, - pub sk_err: i32, // Offset 284 - pub sk_err_soft: i32, // Offset 600 - pub sk_backlog_len: i32, // Offset 196 - pub sk_write_memory_queued: i32,// Offset 376 - pub sk_receive_buffer_size: i32,// Offset 244 - pub sk_ack_backlog: u32, // Offset 604 - pub sk_drops: i32, // Offset 136 + pub sk_err: i32, // Offset 284 + pub sk_err_soft: i32, // Offset 600 + pub sk_backlog_len: i32, // Offset 196 + pub sk_write_memory_queued: i32, // Offset 376 + pub sk_receive_buffer_size: i32, // Offset 244 + pub sk_ack_backlog: u32, // Offset 604 + pub sk_drops: i32, // Offset 136 } -#[repr(C,packed)] +#[repr(C, packed)] #[derive(Copy, Clone)] pub struct TimeStampStartInfo { pub comm: [u8; TASK_COMM_LEN], @@ -25,7 +28,7 @@ pub struct TimeStampStartInfo { } // Event we send to userspace when latency is computed -#[repr(C,packed)] +#[repr(C, packed)] #[derive(Copy, Clone)] pub struct TimeStampEvent { pub delta_us: u64, @@ -41,6 +44,48 @@ pub struct TimeStampEvent { pub daddr_v6: [u32; 4], } +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub struct CpuFrequency { + //pub(crate) cpu_id: u32, + //pub(crate) cpu_freq: u32, + pub(crate) bytes_alloc: u32, + pub(crate) pid: u32, + pub(crate) command: [u8; 16], +} + +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub struct MemAlloc { + pub(crate) tgid: u32, + pub(crate) length: u64, + pub(crate) addr: u64, + pub(crate) command: [u8; 16], +} + +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub struct SchedStatWait { + pub(crate) tgid: u32, + pub(crate) delay: u64, + pub(crate) command: [u8; 16], +} + +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub struct SchedStatRuntime { + pub(crate) tgid: u32, + pub(crate) runtime: u64, + pub(crate) command: [u8; 16], +} + +#[repr(C, packed)] +#[derive(Copy, Clone)] +pub struct CpuIdle { + pub(crate) cpu_id: u32, + pub(crate) state: u32, +} + // Map: connect-start timestamp by socket pointer #[map(name = "time_stamp_start")] pub static mut TIME_STAMP_START: HashMap<*mut core::ffi::c_void, TimeStampStartInfo> = @@ -48,7 +93,27 @@ pub static mut TIME_STAMP_START: HashMap<*mut core::ffi::c_void, TimeStampStartI // Perf event channel for emitting Event to userspace #[map(name = "time_stamp_events")] -pub static mut TIME_STAMP_EVENTS: PerfEventArray = PerfEventArray::::new(0); +pub static mut TIME_STAMP_EVENTS: PerfEventArray = + PerfEventArray::::new(0); #[map(name = "net_metrics")] pub static NET_METRICS: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "cpu_frequency")] +pub static CPU_FREQUENCY: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "mem_alloc")] +pub static MEM_ALLOC: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "sched_stat_wait")] +pub static SCHED_STAT_WAIT: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "sched_stat_runtime")] +pub static SCHED_STAT_RUNTIME: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "cpu_idle")] +pub static CPU_IDLE: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "cpu_idle_last_state")] +pub static mut CPU_IDLE_LAST_STATE: HashMap = + HashMap::::with_max_entries(256, 0); diff --git a/core/src/components/metrics_tracer/src/main.rs b/core/src/components/metrics_tracer/src/main.rs index 216a6aca..55faa453 100644 --- a/core/src/components/metrics_tracer/src/main.rs +++ b/core/src/components/metrics_tracer/src/main.rs @@ -3,22 +3,34 @@ #![allow(warnings)] mod bindings; +mod cpu; mod data_structures; - -use core::{mem, ptr}; +mod memory; use crate::bindings::net_device; -use aya_ebpf::helpers::generated::{bpf_ktime_get_ns, bpf_perf_event_output}; +use crate::cpu::{cpu_idle, per_cpu_bytes_alloc, sched_stat_runtime, sched_stat_wait}; +use crate::data_structures::CpuFrequency; +use crate::data_structures::NET_METRICS; +use crate::data_structures::{CPU_FREQUENCY, SchedStatWait}; +use crate::data_structures::{ + CPU_IDLE, NetworkMetrics, TASK_COMM_LEN, TIME_STAMP_EVENTS, TIME_STAMP_START, TimeStampEvent, + TimeStampStartInfo, +}; +use crate::data_structures::{MEM_ALLOC, SCHED_STAT_RUNTIME, SCHED_STAT_WAIT}; +use crate::data_structures::{MemAlloc, SchedStatRuntime}; +use crate::memory::enter_mmap; use aya_ebpf::EbpfContext; -use aya_ebpf::helpers::{bpf_get_current_comm, bpf_probe_read_kernel, bpf_probe_read_kernel_str_bytes}; -use aya_ebpf::macros::{kprobe, map}; -use aya_ebpf::maps::{HashMap, PerfEventArray}; -use aya_ebpf::programs::ProbeContext; use aya_ebpf::helpers::bpf_get_current_pid_tgid; -use crate::data_structures::{NetworkMetrics, TimeStampEvent, TimeStampStartInfo, TASK_COMM_LEN, TIME_STAMP_EVENTS, TIME_STAMP_START}; -use crate::data_structures::NET_METRICS; +use aya_ebpf::helpers::generated::{bpf_ktime_get_ns, bpf_perf_event_output}; +use aya_ebpf::helpers::{ + bpf_get_current_comm, bpf_probe_read_kernel, bpf_probe_read_kernel_str_bytes, +}; +use aya_ebpf::macros::{kprobe, map, tracepoint}; +use aya_ebpf::maps::{HashMap, PerfEventArray}; +use aya_ebpf::programs::{ProbeContext, TracePointContext}; +use core::{mem, ptr}; -const AF_INET: u16 = 2; +const AF_INET: u16 = 2; const AF_INET6: u16 = 10; const TCP_SYN_SENT: u8 = 2; @@ -48,13 +60,33 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { let sk_ack_backlog_offset = 604; let sk_drops_offset = 136; - let sk_err = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_err_offset) as *const i32).map_err(|_| 1)? }; - let sk_err_soft = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_err_soft_offset) as *const i32).map_err(|_| 1)? }; - let sk_backlog_len = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_backlog_len_offset) as *const i32).map_err(|_| 1)? }; - let sk_write_memory_queued = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_write_memory_queued_offset) as *const i32).map_err(|_| 1)? }; - let sk_receive_buffer_size = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_receive_buffer_size_offset) as *const i32).map_err(|_| 1)? }; - let sk_ack_backlog = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_ack_backlog_offset) as *const u32).map_err(|_| 1)? }; - let sk_drops = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_drops_offset) as *const i32).map_err(|_| 1)? }; + let sk_err = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_err_offset) as *const i32).map_err(|_| 1)? + }; + let sk_err_soft = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_err_soft_offset) as *const i32) + .map_err(|_| 1)? + }; + let sk_backlog_len = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_backlog_len_offset) as *const i32) + .map_err(|_| 1)? + }; + let sk_write_memory_queued = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_write_memory_queued_offset) as *const i32) + .map_err(|_| 1)? + }; + let sk_receive_buffer_size = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_receive_buffer_size_offset) as *const i32) + .map_err(|_| 1)? + }; + let sk_ack_backlog = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_ack_backlog_offset) as *const u32) + .map_err(|_| 1)? + }; + let sk_drops = unsafe { + bpf_probe_read_kernel::(sk_pointer.add(sk_drops_offset) as *const i32) + .map_err(|_| 1)? + }; let net_metrics = NetworkMetrics { tgid: tgid, @@ -79,16 +111,21 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { // Monitor on tcp_sendmsg, tcp_v4_connect #[kprobe] fn tcp_v6_connect(ctx: ProbeContext) -> u32 { - match on_connect(ctx) { Ok(_) => 0, Err(e) => e as u32 } + match on_connect(ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } } // Monitor on tcp_sendmsg, tcp_v4_connect #[kprobe] fn tcp_v4_connect(ctx: ProbeContext) -> u32 { - match on_connect(ctx) { Ok(_) => 0, Err(e) => e as u32 } + match on_connect(ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } } - fn on_connect(ctx: ProbeContext) -> Result<(), i64> { let sk = ctx.arg::<*mut bindings::sock>(0).ok_or(1i64)?; if sk.is_null() { @@ -107,14 +144,19 @@ fn on_connect(ctx: ProbeContext) -> Result<(), i64> { start.comm.copy_from_slice(&comm); } let map_ptr = &raw mut TIME_STAMP_START; - (*map_ptr).insert(&(sk as *mut core::ffi::c_void), &start, 0).map_err(|_| 1)?; + (*map_ptr) + .insert(&(sk as *mut core::ffi::c_void), &start, 0) + .map_err(|_| 1)?; } Ok(()) } #[kprobe] fn tcp_rcv_state_process(ctx: ProbeContext) -> u32 { - match on_rcv_state_process(ctx) { Ok(_) => 0, Err(e) => e as u32 } + match on_rcv_state_process(ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } } fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { @@ -122,25 +164,25 @@ fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { let sk = ctx.arg::<*mut bindings::sock>(0).unwrap_or(ptr::null_mut()); let sk = if sk.is_null() { ctx.arg::<*mut bindings::sock>(1).ok_or(1i64)? - } else { sk }; + } else { + sk + }; if sk.is_null() { return Err(1); } let skc_daddr_off = 0; - let skc_rcv_saddr_off = 4; + let skc_rcv_saddr_off = 4; let skc_dport_off = 12; let skc_num_off = 14; let skc_family_off = 16; let skc_state_off = 18; - let skc_v6_daddr_off = 56; - let skc_v6_rcv_saddr_off = 72; - - let state = unsafe { - bpf_probe_read_kernel::((sk as usize + skc_state_off) as *const u8) - }.map_err(|_| 1)?; + let skc_v6_daddr_off = 56; + let skc_v6_rcv_saddr_off = 72; + let state = unsafe { bpf_probe_read_kernel::((sk as usize + skc_state_off) as *const u8) } + .map_err(|_| 1)?; if state != TCP_SYN_SENT { return Ok(()); @@ -149,7 +191,8 @@ fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { let start = unsafe { let map_ptr = &raw const TIME_STAMP_START; (*map_ptr).get(&((sk as usize) as *mut core::ffi::c_void)) - }.ok_or(1i64)?; + } + .ok_or(1i64)?; let now = unsafe { bpf_ktime_get_ns() }; let delta = now as i64 - start.ts_ns as i64; if delta <= 0 { @@ -176,16 +219,13 @@ fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { // family, ports ev.af = unsafe { - bpf_probe_read_kernel::((sk as usize + skc_family_off) as *const u16) - .map_err(|_| 1)? + bpf_probe_read_kernel::((sk as usize + skc_family_off) as *const u16).map_err(|_| 1)? }; ev.lport = unsafe { - bpf_probe_read_kernel::((sk as usize + skc_num_off) as *const u16) - .map_err(|_| 1)? + bpf_probe_read_kernel::((sk as usize + skc_num_off) as *const u16).map_err(|_| 1)? }; ev.dport_be = unsafe { - bpf_probe_read_kernel::((sk as usize + skc_dport_off) as *const u16) - .map_err(|_| 1)? + bpf_probe_read_kernel::((sk as usize + skc_dport_off) as *const u16).map_err(|_| 1)? }; if ev.af == AF_INET { @@ -202,13 +242,13 @@ fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { for i in 0..4 { ev.saddr_v6[i] = unsafe { bpf_probe_read_kernel::( - (sk as usize + skc_v6_rcv_saddr_off + i * 4) as *const u32 - ).map_err(|_| 1)? + (sk as usize + skc_v6_rcv_saddr_off + i * 4) as *const u32, + ) + .map_err(|_| 1)? }; ev.daddr_v6[i] = unsafe { - bpf_probe_read_kernel::( - (sk as usize + skc_v6_daddr_off + i * 4) as *const u32 - ).map_err(|_| 1)? + bpf_probe_read_kernel::((sk as usize + skc_v6_daddr_off + i * 4) as *const u32) + .map_err(|_| 1)? }; } } @@ -226,11 +266,112 @@ fn on_rcv_state_process(ctx: ProbeContext) -> Result<(), i64> { let _ = (*map_ptr).remove(&((sk as usize) as *mut core::ffi::c_void)); } + Ok(()) +} + +#[tracepoint] +fn trace_cpu_frequency(ctx: TracePointContext) -> u32 { + match trace_cpu_metrics(&ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } +} + +#[tracepoint] +fn trace_cpu_idle(ctx: TracePointContext) -> u32 { + match cpu_idle(ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } +} + +fn trace_cpu_metrics(ctx: &TracePointContext) -> Result<(), i64> { + let (bytes_alloc, pid, command) = per_cpu_bytes_alloc(ctx)?; + //let (cpu_id, cpu_freq) = cpu_frequency(&ctx)?; + let cpu_metrics = CpuFrequency { + // cpu_id, + // cpu_freq, + bytes_alloc, + pid, + command, + }; + + unsafe { CPU_FREQUENCY.output(ctx, &cpu_metrics, 0) }; + + Ok(()) +} + +/// Tracepoint attached to `syscalls:sys_enter_mmap`. +/// +/// Emits a `MemAlloc` event for every `mmap` syscall. No PID/command filter +/// is applied yet (see the next update), so this will generate events for every +/// process in the system. +#[tracepoint] +fn trace_enter_mmap(ctx: TracePointContext) -> u32 { + match trace_memory_allocation(&ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } +} + +fn trace_memory_allocation(ctx: &TracePointContext) -> Result<(), i64> { + let (tgid, addr, length, command) = enter_mmap(ctx)?; + + let memory_alloc_metrics = MemAlloc { + tgid, + addr, + length, + command, + }; + + unsafe { MEM_ALLOC.output(ctx, &memory_alloc_metrics, 0) }; + + Ok(()) +} + +#[tracepoint] +fn trace_sched_stat_wait(ctx: TracePointContext) -> u32 { + match sched_stat_wait_tracer(&ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } +} + +fn sched_stat_wait_tracer(ctx: &TracePointContext) -> Result<(), i64> { + let (tgid, delay, command) = sched_stat_wait(ctx)?; + + let sched_stat_wait_data = SchedStatWait { + tgid, + delay, + command, + }; + + unsafe { SCHED_STAT_WAIT.output(ctx, &sched_stat_wait_data, 0) }; Ok(()) } +#[tracepoint] +fn trace_sched_stat_runtime(ctx: TracePointContext) -> u32 { + match sched_stat_runtime_tracer(&ctx) { + Ok(_) => 0, + Err(e) => e as u32, + } +} + +fn sched_stat_runtime_tracer(ctx: &TracePointContext) -> Result<(), i64> { + let (tgid, runtime, command) = sched_stat_runtime(ctx)?; + let sched_stat_runtime_data = SchedStatRuntime { + tgid, + runtime, + command, + }; + + unsafe { SCHED_STAT_RUNTIME.output(ctx, &sched_stat_runtime_data, 0) }; + + Ok(()) +} // panic handler #[panic_handler] diff --git a/core/src/components/metrics_tracer/src/memory.rs b/core/src/components/metrics_tracer/src/memory.rs new file mode 100644 index 00000000..de14c7a8 --- /dev/null +++ b/core/src/components/metrics_tracer/src/memory.rs @@ -0,0 +1,16 @@ +use aya_ebpf::{EbpfContext, programs::TracePointContext}; + +/// Read the fields of the `syscalls:sys_enter_mmap` tracepoint. +pub fn enter_mmap(ctx: &TracePointContext) -> Result<((u32, u64, u64, [u8; 16])), i64> { + // For syscall tracepoints `common_pid` is the TGID of the calling thread. + let tgid_offset = 4; + let addr_offset = 16; + let len_offset = 24; + + let tgid: u32 = unsafe { ctx.read_at(tgid_offset) }?; + let addr: u64 = unsafe { ctx.read_at(addr_offset) }?; + let len: u64 = unsafe { ctx.read_at(len_offset) }?; + let command = ctx.command()?; + + Ok((tgid, addr, len, command)) +} diff --git a/core/src/components/metrics_tracer/src/mod.rs b/core/src/components/metrics_tracer/src/mod.rs index 76d66830..56f80cc7 100644 --- a/core/src/components/metrics_tracer/src/mod.rs +++ b/core/src/components/metrics_tracer/src/mod.rs @@ -1,2 +1,4 @@ mod bindings; -mod data_structures; \ No newline at end of file +mod cpu; +mod data_structures; +mod memory; diff --git a/core/src/testing/metrics.yaml b/core/src/testing/metrics.yaml index 8a6c7d82..7e748fd6 100644 --- a/core/src/testing/metrics.yaml +++ b/core/src/testing/metrics.yaml @@ -19,7 +19,7 @@ spec: hostNetwork: true containers: - name: metrics - image: lorenzotettamanti/cortexflow-metrics:otel-test-2 + image: lorenzotettamanti/cortexflow-metrics:otel-test-15 command: ["/bin/bash", "-c"] args: - | @@ -47,6 +47,9 @@ spec: - name: kernel-dev mountPath: /lib/modules readOnly: false + - name: tracefs + mountPath: /sys/kernel/debug + readOnly: false securityContext: privileged: true allowPrivilegeEscalation: true @@ -59,7 +62,7 @@ spec: - SYS_PTRACE - name: bpftool-control-manager image: danielpacak/bpftool-runner:latest - command: ["/bin/bash", "-c","sleep infinity"] + command: ["/bin/bash", "-c", "sleep infinity"] volumeMounts: - name: bpf mountPath: /sys/fs/bpf @@ -71,6 +74,9 @@ spec: - name: kernel-dev mountPath: /lib/modules readOnly: false + - name: tracefs + mountPath: /sys/kernel/debug + readOnly: false securityContext: privileged: true allowPrivilegeEscalation: true @@ -94,3 +100,9 @@ spec: hostPath: path: /lib/modules type: Directory + + - name: tracefs + hostPath: + path: /sys/kernel/debug + type: Directory +