From 9a4a36287a0abb2c7e6c2d5c14ea36146331bdcc Mon Sep 17 00:00:00 2001 From: RoyLin Date: Wed, 24 Jun 2026 17:09:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20richer=20signals=20=E2=80=94=20exit-sig?= =?UTF-8?q?nal,=20LLM=20model,=20dest-port,=20uid=20(v0.10.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ProcessExit gains signal (do_exit kprobe, thread-group-leader gated -> one event per process): clean / SIGSEGV crash / SIGKILL-OOM, which the old exit_group tracepoint never saw. - LlmApi {model, prompt_tokens, completion_tokens} parsed (userspace) from opt-in TLS content. - Egress gains dest port (service class); ToolExec gains uid (root/privesc) — both were already read in-kernel and discarded (free wins). - build.rs rerun-if-changed on the ebpf crate (no more stale bytecode). Validated live on the prod cluster; adversarial fan-out caught + fixed a per-thread duplication regression + a broken test before release. 0.9.3 -> 0.10.0. --- CHANGELOG.md | 27 +++++++++ Cargo.lock | 8 +-- Cargo.toml | 2 +- README.md | 3 +- a3s-observer-collector/Cargo.toml | 2 +- a3s-observer-collector/build.rs | 4 ++ a3s-observer-collector/src/main.rs | 90 +++++++++++++++++++++++++++--- a3s-observer-common/Cargo.toml | 2 +- a3s-observer-common/src/lib.rs | 6 +- a3s-observer-ebpf/Cargo.toml | 2 +- a3s-observer-ebpf/src/main.rs | 26 ++++++--- deploy/daemonset.yaml | 2 +- src/model.rs | 25 ++++++++- src/traits.rs | 1 + 14 files changed, 169 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dd9964..287a22f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,33 @@ All notable changes to a3s-observer will be documented in this file. +## [0.10.0] — richer signals: exit-signal, LLM model, dest-port, uid + +### Added + +- **Exit signal** — `ProcessExit` now carries `signal` (0 clean / 9 SIGKILL+OOM / 11 SIGSEGV + crash). The probe moved from the `sys_enter_exit_group` tracepoint to a `do_exit` kprobe, so + crashes and signal-kills — which the tracepoint never saw — are captured. Gated to the + thread-group leader: one event per process, not per thread. +- **LLM model + tokens** — `AgentEvent::LlmApi {model, prompt_tokens, completion_tokens}`, parsed + in userspace from the opt-in TLS content: which model the agent called + token usage. Pairs + with the raw `SslContent`. +- **Destination port on `Egress`** — the service class the agent dials (443 API / 22 SSH / 5432 + Postgres / 6379 Redis / 11434 Ollama). The port was already read in-kernel and discarded. +- **UID on `ToolExec`** — the real UID a tool runs as (0 = root): privilege / privesc visibility. + +### Fixed + +- `build.rs` now emits `rerun-if-changed` for the eBPF crate, so a probe-source-only change no + longer reuses stale bytecode. + +### Tested + +- Each signal validated live on the production cluster (crash/kill/OOM; LlmApi model over TLS; + port 6443/etcd/redis; uid root/service/nobody). An adversarial fan-out review caught + fixed a + per-thread `ProcessExit` duplication regression (multithreaded agents) before release; the + untrusted-input LLM parser passed a 50M-iteration fuzz. + ## [0.9.3] — soak validation + test coverage (no runtime change) ### Tested diff --git a/Cargo.lock b/Cargo.lock index ba44fbe..c2615de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "a3s-observer" -version = "0.9.3" +version = "0.10.0" dependencies = [ "serde", "serde_json", @@ -13,7 +13,7 @@ dependencies = [ [[package]] name = "a3s-observer-collector" -version = "0.9.3" +version = "0.10.0" dependencies = [ "a3s-observer", "a3s-observer-common", @@ -28,11 +28,11 @@ dependencies = [ [[package]] name = "a3s-observer-common" -version = "0.9.3" +version = "0.10.0" [[package]] name = "a3s-observer-ebpf" -version = "0.9.3" +version = "0.10.0" dependencies = [ "a3s-observer-common", "aya-ebpf", diff --git a/Cargo.toml b/Cargo.toml index 6bf77a8..f212df6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "a3s-observer" -version = "0.9.3" +version = "0.10.0" edition = "2021" license = "MIT" description = "General-purpose, language-agnostic eBPF observability for AI agents (LLM calls, tools, files, network egress)." diff --git a/README.md b/README.md index fef6411..ccdb8fa 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ latency / TTFT, or plaintext) / **where** (peer IP / hostname). | signal | kernel hook | event | |---|---|---| | `exec` | `sys_enter_execve` | `ToolExec` — tool / subprocess: **full argv** + cwd, comm, uid | -| `exit` | `sys_enter_exit_group` | `ProcessExit` — tool outcome + **exit code** (clean exits) | +| `exit` | `do_exit` kprobe | `ProcessExit` — outcome: **exit code + signal** (clean / SIGSEGV crash / SIGKILL-OOM), one per process | | `connect` | `sys_enter_connect` | `Egress` — peer IP:port | | `sni` | TLS ClientHello (plaintext `server_name`) | LLM **provider** + endpoint | | `dns` | `sendto` / `sendmsg` / `sendmmsg` to :53 | `Dns` — resolved hostname | @@ -52,6 +52,7 @@ latency / TTFT, or plaintext) / **where** (peer IP / hostname). | `file`\* | `sys_enter_openat` (write opens) | `FileAccess` — files written (`A3S_OBSERVER_FILES=1`) | | `unlink`\* | `sys_enter_unlinkat` | `FileDelete` — files deleted (`A3S_OBSERVER_FILES=1`) | | `ssl`\* | OpenSSL `SSL_write` / `SSL_read` uprobes | `SslContent` — request/response plaintext (`A3S_OBSERVER_SSL=1`) | +| `llm-api`\* | parsed from `SslContent` | `LlmApi` — **model** + token usage (`A3S_OBSERVER_SSL=1`) | Userspace enriches each event with **identity** (k8s cgroup→pod, `/proc` comm+ppid, or an in-kernel `comm` fallback for short-lived processes), a `(pid,fd)→peer` **correlation**, and diff --git a/a3s-observer-collector/Cargo.toml b/a3s-observer-collector/Cargo.toml index fcf78d6..e53f2ce 100644 --- a/a3s-observer-collector/Cargo.toml +++ b/a3s-observer-collector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "a3s-observer-collector" -version = "0.9.3" +version = "0.10.0" edition = "2021" license = "MIT" description = "a3s-observer collector: loads the eBPF probes and exports enriched events." diff --git a/a3s-observer-collector/build.rs b/a3s-observer-collector/build.rs index ad97346..f8b3ae8 100644 --- a/a3s-observer-collector/build.rs +++ b/a3s-observer-collector/build.rs @@ -6,6 +6,10 @@ use aya_build::{Package, Toolchain}; fn main() -> anyhow::Result<()> { let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")?; let ebpf_dir = format!("{manifest_dir}/../a3s-observer-ebpf"); + // aya_build doesn't emit rerun-if for the eBPF crate, so a source-only change to the probes + // would silently reuse stale bytecode. Track it explicitly. + println!("cargo:rerun-if-changed={ebpf_dir}/src"); + println!("cargo:rerun-if-changed={ebpf_dir}/Cargo.toml"); aya_build::build_ebpf( [Package { name: "a3s-observer-ebpf", diff --git a/a3s-observer-collector/src/main.rs b/a3s-observer-collector/src/main.rs index b7bd96d..094dbed 100644 --- a/a3s-observer-collector/src/main.rs +++ b/a3s-observer-collector/src/main.rs @@ -17,7 +17,7 @@ use a3s_observer_common::{ use anyhow::Context as _; use aya::{ maps::{PerCpuArray, RingBuf}, - programs::{TracePoint, UProbe}, + programs::{KProbe, TracePoint, UProbe}, Ebpf, }; use std::collections::HashMap; @@ -63,7 +63,6 @@ async fn main() -> anyhow::Result<()> { let files = std::env::var_os("A3S_OBSERVER_FILES").is_some(); let mut probes = vec![ ("exec", "sys_enter_execve"), - ("proc_exit", "sys_enter_exit_group"), ("tls_write", "sys_enter_write"), ("tls_sendto", "sys_enter_sendto"), ("connect", "sys_enter_connect"), @@ -91,6 +90,14 @@ async fn main() -> anyhow::Result<()> { } } } + // proc_exit is a do_exit kprobe (not a tracepoint): do_exit fires for EVERY task exit, + // including signal-kills (crash / OOM) that sys_enter_exit_group never sees. + match attach_kprobe(&mut ebpf, "proc_exit", "do_exit") { + Ok(()) => attached += 1, + Err(e) => { + tracing::warn!(error = %e, "proc_exit (do_exit kprobe) failed — exit signals unavailable") + } + } if attached == 0 { anyhow::bail!("no eBPF probes could be attached"); } @@ -132,7 +139,7 @@ async fn main() -> anyhow::Result<()> { let classifier = SniClassifier; let resolver = KubeResolver; // cgroup→pod in k8s; falls back to comm on bare hosts // (pid,fd) -> peer, populated by connect, read by the TLS probe to fuse provider+peer. - let mut peers: HashMap = HashMap::new(); + let mut peers: HashMap = HashMap::new(); // (pid,fd) -> (sni, provider, peer): recorded at ClientHello, read when the socket // closes (the in-kernel LlmEvent) to build the metric-bearing LlmCall. let mut llm_meta: HashMap, Option, IpAddr)> = HashMap::new(); @@ -232,6 +239,7 @@ async fn main() -> anyhow::Result<()> { event: AgentEvent::ToolExec { pid: ev.pid, ppid: read_ppid(ev.pid), + uid: ev.uid, argv: argv_of(&ev), cwd: read_cwd(ev.pid), }, @@ -246,6 +254,7 @@ async fn main() -> anyhow::Result<()> { event: AgentEvent::ProcessExit { pid: ev.pid, exit_code: ev.exit_code, + signal: ev.signal, }, }); } @@ -257,7 +266,7 @@ async fn main() -> anyhow::Result<()> { if peers.len() > 8192 { peers.clear(); // ponytail: crude cap; LRU if it ever matters } - peers.insert(sock_key(ev.pid, ev.fd), peer); + peers.insert(sock_key(ev.pid, ev.fd), (peer, ev.port)); emit(exporter.as_ref(), &mut stats, EnrichedEvent { identity: identity_for(&resolver, ev.pid, &ev.comm), provider: None, @@ -265,6 +274,7 @@ async fn main() -> anyhow::Result<()> { pid: ev.pid, sni: None, peer, + port: ev.port, bytes: 0, }, }); @@ -275,10 +285,10 @@ async fn main() -> anyhow::Result<()> { let len = (ev.len as usize).min(ev.data.len()); let sni = parse_sni(&ev.data[..len]); // Correlated peer for this socket (the LLM endpoint). - let peer = peers + let (peer, port) = peers .get(&sock_key(ev.pid, ev.fd)) .copied() - .unwrap_or(UNKNOWN_PEER); + .unwrap_or((UNKNOWN_PEER, 0)); let provider = sni.as_deref().and_then(|h| classifier.classify(Some(h), peer)); // Remember the call so the close event can build a metric-bearing @@ -294,6 +304,7 @@ async fn main() -> anyhow::Result<()> { pid: ev.pid, sni, peer, + port, bytes: ev.len as u64, }, }); @@ -364,8 +375,25 @@ async fn main() -> anyhow::Result<()> { let len = (ev.len as usize).min(ev.data.len()); let content = String::from_utf8_lossy(&ev.data[..len]).into_owned(); if !content.is_empty() { + let identity = identity_for(&resolver, ev.pid, &ev.comm); + // Structured LLM telemetry (model/tokens) alongside the raw content. + if let Some((model, prompt_tokens, completion_tokens)) = + parse_llm_meta(&content) + { + emit(exporter.as_ref(), &mut stats, EnrichedEvent { + identity: identity.clone(), + provider: None, + event: AgentEvent::LlmApi { + pid: ev.pid, + is_request: ev.is_read == 0, + model, + prompt_tokens, + completion_tokens, + }, + }); + } emit(exporter.as_ref(), &mut stats, EnrichedEvent { - identity: identity_for(&resolver, ev.pid, &ev.comm), + identity, provider: None, event: AgentEvent::SslContent { pid: ev.pid, @@ -406,6 +434,17 @@ fn attach(ebpf: &mut Ebpf, prog: &str, category: &str, name: &str) -> anyhow::Re Ok(()) } +fn attach_kprobe(ebpf: &mut Ebpf, prog: &str, sym: &str) -> anyhow::Result<()> { + let p: &mut KProbe = ebpf + .program_mut(prog) + .with_context(|| format!("`{prog}` program not found"))? + .try_into()?; + p.load()?; + p.attach(sym, 0) + .with_context(|| format!("attach kprobe {sym}"))?; + Ok(()) +} + fn attach_uprobe(ebpf: &mut Ebpf, prog: &str, sym: &str, target: &str) -> anyhow::Result<()> { let p: &mut UProbe = ebpf .program_mut(prog) @@ -486,6 +525,7 @@ fn emit(exporter: &dyn Exporter, stats: &mut Stats, ev: EnrichedEvent) { AgentEvent::FileDelete { .. } => stats.file += 1, AgentEvent::LlmCall { .. } => stats.llm += 1, AgentEvent::SslContent { .. } => stats.ssl += 1, + AgentEvent::LlmApi { .. } => stats.llm += 1, } exporter.export(&ev); } @@ -560,9 +600,33 @@ fn parse_dns_qname(buf: &[u8]) -> Option { (!name.is_empty()).then_some(name) } +/// Best-effort LLM-API fields from captured TLS plaintext: `"model"` from a request body, token +/// `usage` from a response. None if absent (not an LLM call, or the bytes weren't captured). +/// Consumes untrusted plaintext — every index is bounds-checked, must never panic. +fn parse_llm_meta(s: &str) -> Option<(Option, Option, Option)> { + let model = json_str_after(s, "\"model\""); + let pt = json_num_after(s, "\"prompt_tokens\""); + let ct = json_num_after(s, "\"completion_tokens\""); + (model.is_some() || pt.is_some() || ct.is_some()).then_some((model, pt, ct)) +} + +fn json_str_after(s: &str, key: &str) -> Option { + let rest = &s[s.find(key)? + key.len()..]; // find() ≤ len, +key.len() ≤ len → in-bounds + let body = &rest[rest.find('"')? + 1..]; // past the value's opening quote + Some(body[..body.find('"')?].to_owned()) +} + +fn json_num_after(s: &str, key: &str) -> Option { + let rest = s[s.find(key)? + key.len()..].trim_start_matches([':', ' ', '\t']); + let end = rest + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(rest.len()); + rest.get(..end)?.parse().ok() +} + #[cfg(test)] mod tests { - use super::{parse_dns_qname, parse_sni}; + use super::{parse_dns_qname, parse_llm_meta, parse_sni}; #[test] fn parses_sni_from_minimal_clienthello() { @@ -589,6 +653,16 @@ mod tests { assert_eq!(parse_sni(&[]), None); } + #[test] + fn parse_llm_meta_extracts_model_and_tokens() { + let req = r#"POST /v1/chat/completions HTTP/1.1 ... {"model":"gpt-4o","messages":[{"role":"user","content":"hi"}]}"#; + assert_eq!(parse_llm_meta(req).unwrap().0.as_deref(), Some("gpt-4o")); + let resp = r#"{"id":"x","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":34}}"#; + let (_, pt, ct) = parse_llm_meta(resp).unwrap(); + assert_eq!((pt, ct), (Some(12), Some(34))); + assert!(parse_llm_meta("just plaintext, no json fields here").is_none()); + } + #[test] fn parses_dns_query_name() { let mut q = vec![0u8; 12]; // header diff --git a/a3s-observer-common/Cargo.toml b/a3s-observer-common/Cargo.toml index 68474f8..2cc02a6 100644 --- a/a3s-observer-common/Cargo.toml +++ b/a3s-observer-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "a3s-observer-common" -version = "0.9.3" +version = "0.10.0" edition = "2021" license = "MIT" description = "Shared no_std types crossing the eBPF <-> userspace boundary for a3s-observer." diff --git a/a3s-observer-common/src/lib.rs b/a3s-observer-common/src/lib.rs index d572893..e52d8ee 100644 --- a/a3s-observer-common/src/lib.rs +++ b/a3s-observer-common/src/lib.rs @@ -27,12 +27,14 @@ pub struct ExecEvent { /// A process exit (`sys_enter_exit_group`) — the other end of the tool lifecycle, carrying the /// exit status so tool *outcomes* are visible (did the command succeed?), not just that it ran. -/// Catches clean exits (the C runtime's `exit()` calls `exit_group`); signal kills don't. +/// Captured via a `do_exit` kprobe, so it catches EVERY exit — clean exits and signal-kills +/// (crash / SIGKILL / OOM) alike. #[repr(C)] #[derive(Clone, Copy)] pub struct ExitEvent { pub pid: u32, - pub exit_code: u32, // exit_group status; the low byte is the code passed to exit() + pub exit_code: u32, // exit() status (0 when terminated by a signal) + pub signal: u32, // terminating signal, 0 = clean exit (9 SIGKILL/OOM, 11 SIGSEGV crash, …) pub comm: [u8; 16], } diff --git a/a3s-observer-ebpf/Cargo.toml b/a3s-observer-ebpf/Cargo.toml index 40113e8..e9c70a9 100644 --- a/a3s-observer-ebpf/Cargo.toml +++ b/a3s-observer-ebpf/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "a3s-observer-ebpf" -version = "0.9.3" +version = "0.10.0" edition = "2021" license = "MIT" publish = false diff --git a/a3s-observer-ebpf/src/main.rs b/a3s-observer-ebpf/src/main.rs index 8ec7927..ce4965e 100644 --- a/a3s-observer-ebpf/src/main.rs +++ b/a3s-observer-ebpf/src/main.rs @@ -11,7 +11,7 @@ use aya_ebpf::{ bpf_get_current_comm, bpf_get_current_pid_tgid, bpf_get_current_uid_gid, bpf_ktime_get_ns, bpf_probe_read_user_buf, bpf_probe_read_user_str_bytes, }, - macros::{cgroup_sock_addr, map, tracepoint, uprobe, uretprobe}, + macros::{cgroup_sock_addr, kprobe, map, tracepoint, uprobe, uretprobe}, maps::{ring_buf::RingBufEntry, HashMap, PerCpuArray, RingBuf}, programs::{ProbeContext, RetProbeContext, SockAddrContext, TracePointContext}, }; @@ -148,23 +148,33 @@ fn try_exec(ctx: &TracePointContext) -> Result { Ok(0) } -// ---- process exit (sys_enter_exit_group) — the tool's outcome / exit code ---- +// ---- process exit (do_exit kprobe) — the tool's outcome: exit code AND terminating signal ---- -#[tracepoint] -pub fn proc_exit(ctx: TracePointContext) -> u32 { +#[kprobe] +pub fn proc_exit(ctx: ProbeContext) -> u32 { try_proc_exit(&ctx).unwrap_or(0) } -fn try_proc_exit(ctx: &TracePointContext) -> Result { +// do_exit(long code) fires for EVERY task exit, including signal-kills (SIGSEGV crash, SIGKILL / +// OOM) that never call exit_group. `code` is the wait-status: low 7 bits = terminating signal, +// (code >> 8) & 0xff = the exit() status. +fn try_proc_exit(ctx: &ProbeContext) -> Result { + // do_exit fires per-THREAD; emit once per PROCESS by gating on the thread-group leader + // (tgid == task pid). Without this a multithreaded agent emits N duplicate ProcessExit/pid. + let id = bpf_get_current_pid_tgid(); + if (id >> 32) as u32 != id as u32 { + return Ok(0); + } + let code: u64 = ctx.arg(0).unwrap_or(0); let Some(mut entry) = reserve_or_drop::(&EXIT_EVENTS) else { return Ok(0); }; let ev = entry.as_mut_ptr(); unsafe { - (*ev).pid = (bpf_get_current_pid_tgid() >> 32) as u32; + (*ev).pid = (id >> 32) as u32; (*ev).comm = bpf_get_current_comm().unwrap_or_default(); - // sys_enter_exit_group: `long error_code` at offset 16 (low byte = the exit() code). - (*ev).exit_code = ctx.read_at::(16).unwrap_or(0) as u32; + (*ev).exit_code = ((code >> 8) & 0xff) as u32; + (*ev).signal = (code & 0x7f) as u32; // & 0x7f intentionally drops the 0x80 core-dump bit } entry.submit(0); Ok(0) diff --git a/deploy/daemonset.yaml b/deploy/daemonset.yaml index 082387a..e556749 100644 --- a/deploy/daemonset.yaml +++ b/deploy/daemonset.yaml @@ -25,7 +25,7 @@ spec: terminationGracePeriodSeconds: 30 # the collector flushes a final report on SIGTERM containers: - name: a3s-observer - image: 10.12.111.133:49164/a3s/observer:0.9.2 # mirror of ghcr.io/a3s-lab/observer:0.9.2 + image: 10.12.111.133:49164/a3s/observer:0.10.0 # mirror of ghcr.io/a3s-lab/observer:0.10.0 securityContext: # eBPF load + tracepoint attach requires privileged. (Verified: a non-root # process with only CAP_BPF+CAP_PERFMON fails to attach — the tracefs tracepoint diff --git a/src/model.rs b/src/model.rs index b0e13ef..d94eadf 100644 --- a/src/model.rs +++ b/src/model.rs @@ -13,12 +13,19 @@ pub enum AgentEvent { ToolExec { pid: u32, ppid: u32, + /// Real UID the tool runs as (0 = root) — surfaces privilege / privesc. + uid: u32, argv: Vec, cwd: String, }, - /// A process exited (`sys_enter_exit_group`) — the tool's outcome / exit code. Pairs with - /// `ToolExec` to bracket a tool's lifecycle (started → finished with this status). - ProcessExit { pid: u32, exit_code: u32 }, + /// A process exited (`do_exit` kprobe) — the tool's outcome: exit code AND terminating signal + /// (0 = clean; 9 = SIGKILL/OOM; 11 = SIGSEGV crash). Pairs with `ToolExec` to bracket a tool's + /// lifecycle (started → finished / crashed / killed). + ProcessExit { + pid: u32, + exit_code: u32, + signal: u32, + }, /// A file was opened (`openat`). FileAccess { pid: u32, path: String, write: bool }, /// A file was deleted (`unlinkat`) — a destructive action; pairs with `FileAccess`. @@ -45,6 +52,8 @@ pub enum AgentEvent { pid: u32, sni: Option, peer: IpAddr, + /// Destination port (host order) — the service class: 443 API, 22 SSH, 5432 PG, 6379 Redis… + port: u16, bytes: u64, }, /// A DNS query — a hostname the process resolved (`sys_enter_sendto` to :53). @@ -59,6 +68,16 @@ pub enum AgentEvent { is_read: bool, content: String, }, + /// Structured LLM-API telemetry parsed from captured TLS content: `model` from the request + /// body, token `usage` from the response. Best-effort (depends on the bytes landing within the + /// snapshot); pairs with `SslContent` to turn raw plaintext into "which model, how many tokens". + LlmApi { + pid: u32, + is_request: bool, + model: Option, + prompt_tokens: Option, + completion_tokens: Option, + }, } /// An [`AgentEvent`] tagged with the resolved [`Identity`] and, for LLM calls, the diff --git a/src/traits.rs b/src/traits.rs index 8ce1b37..50fbc51 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -350,6 +350,7 @@ mod tests { event: AgentEvent::ProcessExit { pid: 1, exit_code: 0, + signal: 0, }, }; for _ in 0..50 {