diff --git a/Cargo.lock b/Cargo.lock index 6057d359..b536e65e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,16 @@ dependencies = [ "log", ] +[[package]] +name = "epoll" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e74d68fe2927dbf47aa976d14d93db9b23dced457c7bb2bdc6925a16d31b736e" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -438,6 +448,7 @@ dependencies = [ "aya", "clap", "env_logger", + "epoll", "fact-api", "fact-ebpf", "glob", diff --git a/Cargo.toml b/Cargo.toml index 85be5dc1..cc7ca4db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ env_logger = { version = "0.11.5", default-features = false, features = ["humant glob = "0.3.3" globset = "0.4.18" governor = "0.10.4" +epoll = "4.4.0" http-body-util = "0.1.3" hyper = { version = "1.6.0", default-features = false } hyper-tls = "0.6.0" diff --git a/about.toml b/about.toml index 26d24936..c1e5e27c 100644 --- a/about.toml +++ b/about.toml @@ -5,5 +5,6 @@ accepted = [ "Zlib", "ISC", "Unicode-3.0", + "MPL-2.0", ] diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 5a7d900c..9b6f9f34 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -33,6 +33,7 @@ serde_json = { workspace = true } shlex = { workspace = true } uuid = { workspace = true } yaml-rust2 = { workspace = true } +epoll = { workspace = true } fact-api = { path = "../fact-api" } fact-ebpf = { path = "../fact-ebpf" } diff --git a/fact/src/bpf/mod.rs b/fact/src/bpf/mod.rs index 917eae3a..7814e795 100644 --- a/fact/src/bpf/mod.rs +++ b/fact/src/bpf/mod.rs @@ -1,4 +1,9 @@ -use std::{io, path::PathBuf}; +use std::{ + io, + os::fd::AsRawFd, + path::PathBuf, + thread::{self, JoinHandle}, +}; use anyhow::{Context, bail}; use aya::{ @@ -10,11 +15,7 @@ use checks::Checks; use globset::{Glob, GlobSet, GlobSetBuilder}; use libc::c_char; use log::{error, info, warn}; -use tokio::{ - io::unix::AsyncFd, - sync::{mpsc, watch}, - task::JoinHandle, -}; +use tokio::sync::{mpsc, watch}; use crate::{config::BpfConfig, event::Event, host_info, metrics::EventCounter}; @@ -211,22 +212,40 @@ impl Bpf { // Gather events from the ring buffer and print them out. pub fn start( mut self, - mut running: watch::Receiver, + running: watch::Receiver, event_counter: EventCounter, ) -> JoinHandle> { info!("Starting BPF worker..."); - tokio::spawn(async move { - let rb = self.take_ringbuffer()?; - let mut fd = AsyncFd::new(rb)?; + thread::spawn(move || { + let mut rb = self.take_ringbuffer()?; + + let rb_event = epoll::Event::new(epoll::Events::EPOLLIN, 0); + let poller = match epoll::create(false) { + Ok(p) => p, + Err(e) => bail!("Failed to create epoll: {e:?}"), + }; + if let Err(e) = epoll::ctl( + poller, + epoll::ControlOptions::EPOLL_CTL_ADD, + rb.as_raw_fd(), + rb_event, + ) { + bail!("Failed to add ringbuffer to epoll: {e:?}"); + } loop { - tokio::select! { - guard = fd.readable_mut() => { - let mut guard = guard - .context("ringbuffer guard held while runtime is stopping")?; - let ringbuf = guard.get_inner_mut(); - while let Some(event) = ringbuf.next() { + if running.has_changed()? && !*running.borrow() { + break; + } + + if self.paths_config.has_changed()? { + self.load_paths().context("Failed to load paths")?; + } + + match epoll::wait(poller, 100, &mut [rb_event]) { + Ok(n) if n != 0 => { + while let Some(event) = rb.next() { let event: &event_t = unsafe { &*(event.as_ptr() as *const _) }; let event = match Event::try_from(event) { Ok(event) => { @@ -234,13 +253,14 @@ impl Bpf { // its host path, but we don't have that context here, // so we let the event go into HostScanner and make the // decision there. - if !event.is_monitored_by_parent() && - event.is_ignored(&self.paths_globset) { + if !event.is_monitored_by_parent() + && event.is_ignored(&self.paths_globset) + { event_counter.dropped(); continue; } event - }, + } Err(e) => { error!("Failed to parse event: '{e}'"); event_counter.dropped(); @@ -249,25 +269,18 @@ impl Bpf { }; event_counter.added(); - if self.tx.send(event).await.is_err() { + if self.tx.blocking_send(event).is_err() { info!("No BPF consumers left, stopping..."); break; } } - guard.clear_ready(); - }, - _ = self.paths_config.changed() => { - self.load_paths().context("Failed to load paths")?; - }, - _ = running.changed() => { - if !*running.borrow() { - info!("Stopping BPF worker..."); - break; - } - }, + } + Ok(_) => {} + Err(e) => bail!("Failed to wait for ringbuffer events: {e:?}"), } } + info!("Stopping BPF worker..."); Ok(()) }) } @@ -289,8 +302,8 @@ mod bpf_tests { use super::*; - #[tokio::test] - async fn test_basic() { + #[test] + fn test_basic() { if let Ok(value) = std::env::var("FACT_LOGLEVEL") { let value = value.to_lowercase(); if value == "debug" || value == "trace" { @@ -312,7 +325,7 @@ mod bpf_tests { let handle = bpf.start(run_rx, exporter.metrics.bpf_worker.clone()); - tokio::time::sleep(Duration::from_millis(500)).await; + thread::sleep(Duration::from_millis(500)); // Create a file let file = NamedTempFile::new_in(&monitored_path).expect("Failed to create temporary file"); @@ -383,24 +396,26 @@ mod bpf_tests { // Close the file, removing it file.close().expect("Failed to close temp file"); - let wait = timeout(Duration::from_secs(1), async move { - for expected in expected_events { - println!("expected: {expected:#?}"); - while let Some(event) = rx.recv().await { - println!("{event:#?}"); - if event == expected { - println!("Found!"); - break; + tokio::runtime::Runtime::new().unwrap().block_on(async { + let wait = timeout(Duration::from_secs(1), async { + for expected in expected_events { + println!("expected: {expected:#?}"); + while let Some(event) = rx.recv().await { + println!("{event:#?}"); + if event == expected { + println!("Found!"); + break; + } } } + }); + + tokio::select! { + res = wait => res.unwrap(), } }); - tokio::select! { - res = wait => res.unwrap(), - res = handle => res.unwrap().unwrap(), - } - run_tx.send(false).unwrap(); + handle.join().unwrap().unwrap(); } } diff --git a/fact/src/lib.rs b/fact/src/lib.rs index bd9d708b..a6d719b6 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -109,9 +109,16 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { let mut host_scanner_handle = host_scanner.start(); let mut rate_limiter_handle = rate_limiter.start(); endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start(); - let mut bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone()); + let bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone()); reloader.start(running.subscribe()); + let (bpf_shutdown_tx, mut bpf_shutdown_rx) = + tokio::sync::mpsc::channel::>(1); + tokio::task::spawn_blocking(move || { + let res = bpf_handle.join().unwrap(); + bpf_shutdown_tx.blocking_send(res).unwrap(); + }); + let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; loop { @@ -119,12 +126,11 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { _ = tokio::signal::ctrl_c() => break, _ = sigterm.recv() => break, _ = sighup.recv() => config_trigger.notify_one(), - res = bpf_handle.borrow_mut() => { + res = bpf_shutdown_rx.recv() => { match res { - Ok(res) => if let Err(e) = res { - warn!("BPF worker errored out: {e:?}"); - } - Err(e) => warn!("BPF task errored out: {e:?}"), + Some(Ok(())) => info!("BPF worker finished"), + Some(Err(e)) => warn!("BPF worker errored out: {e:?}"), + None => warn!("BPF worker channel closed"), } break; }