Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@

All notable changes to a3s-observer will be documented in this file.

## [0.10.0] — richer signals: exit-signal, LLM model, dest-port, uid

### Added

- **Exit signal** — `ProcessExit` now carries `signal` (0 clean / 9 SIGKILL+OOM / 11 SIGSEGV
crash). The probe moved from the `sys_enter_exit_group` tracepoint to a `do_exit` kprobe, so
crashes and signal-kills — which the tracepoint never saw — are captured. Gated to the
thread-group leader: one event per process, not per thread.
- **LLM model + tokens** — `AgentEvent::LlmApi {model, prompt_tokens, completion_tokens}`, parsed
in userspace from the opt-in TLS content: which model the agent called + token usage. Pairs
with the raw `SslContent`.
- **Destination port on `Egress`** — the service class the agent dials (443 API / 22 SSH / 5432
Postgres / 6379 Redis / 11434 Ollama). The port was already read in-kernel and discarded.
- **UID on `ToolExec`** — the real UID a tool runs as (0 = root): privilege / privesc visibility.

### Fixed

- `build.rs` now emits `rerun-if-changed` for the eBPF crate, so a probe-source-only change no
longer reuses stale bytecode.

### Tested

- Each signal validated live on the production cluster (crash/kill/OOM; LlmApi model over TLS;
port 6443/etcd/redis; uid root/service/nobody). An adversarial fan-out review caught + fixed a
per-thread `ProcessExit` duplication regression (multithreaded agents) before release; the
untrusted-input LLM parser passed a 50M-iteration fuzz.

## [0.9.3] — soak validation + test coverage (no runtime change)

### Tested
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "a3s-observer"
version = "0.9.3"
version = "0.10.0"
edition = "2021"
license = "MIT"
description = "General-purpose, language-agnostic eBPF observability for AI agents (LLM calls, tools, files, network egress)."
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ latency / TTFT, or plaintext) / **where** (peer IP / hostname).
| signal | kernel hook | event |
|---|---|---|
| `exec` | `sys_enter_execve` | `ToolExec` — tool / subprocess: **full argv** + cwd, comm, uid |
| `exit` | `sys_enter_exit_group` | `ProcessExit` — tool outcome + **exit code** (clean exits) |
| `exit` | `do_exit` kprobe | `ProcessExit` — outcome: **exit code + signal** (clean / SIGSEGV crash / SIGKILL-OOM), one per process |
| `connect` | `sys_enter_connect` | `Egress` — peer IP:port |
| `sni` | TLS ClientHello (plaintext `server_name`) | LLM **provider** + endpoint |
| `dns` | `sendto` / `sendmsg` / `sendmmsg` to :53 | `Dns` — resolved hostname |
| llm metrics | per-socket `read`/`recv` + `close` | `LlmCall` — req/resp wire bytes, latency, TTFT |
| `file`\* | `sys_enter_openat` (write opens) | `FileAccess` — files written (`A3S_OBSERVER_FILES=1`) |
| `unlink`\* | `sys_enter_unlinkat` | `FileDelete` — files deleted (`A3S_OBSERVER_FILES=1`) |
| `ssl`\* | OpenSSL `SSL_write` / `SSL_read` uprobes | `SslContent` — request/response plaintext (`A3S_OBSERVER_SSL=1`) |
| `llm-api`\* | parsed from `SslContent` | `LlmApi` — **model** + token usage (`A3S_OBSERVER_SSL=1`) |

Userspace enriches each event with **identity** (k8s cgroup→pod, `/proc` comm+ppid, or an
in-kernel `comm` fallback for short-lived processes), a `(pid,fd)→peer` **correlation**, and
Expand Down
2 changes: 1 addition & 1 deletion a3s-observer-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "a3s-observer-collector"
version = "0.9.3"
version = "0.10.0"
edition = "2021"
license = "MIT"
description = "a3s-observer collector: loads the eBPF probes and exports enriched events."
Expand Down
4 changes: 4 additions & 0 deletions a3s-observer-collector/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use aya_build::{Package, Toolchain};
fn main() -> anyhow::Result<()> {
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")?;
let ebpf_dir = format!("{manifest_dir}/../a3s-observer-ebpf");
// aya_build doesn't emit rerun-if for the eBPF crate, so a source-only change to the probes
// would silently reuse stale bytecode. Track it explicitly.
println!("cargo:rerun-if-changed={ebpf_dir}/src");
println!("cargo:rerun-if-changed={ebpf_dir}/Cargo.toml");
aya_build::build_ebpf(
[Package {
name: "a3s-observer-ebpf",
Expand Down
90 changes: 82 additions & 8 deletions a3s-observer-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use a3s_observer_common::{
use anyhow::Context as _;
use aya::{
maps::{PerCpuArray, RingBuf},
programs::{TracePoint, UProbe},
programs::{KProbe, TracePoint, UProbe},
Ebpf,
};
use std::collections::HashMap;
Expand Down Expand Up @@ -63,7 +63,6 @@ async fn main() -> anyhow::Result<()> {
let files = std::env::var_os("A3S_OBSERVER_FILES").is_some();
let mut probes = vec![
("exec", "sys_enter_execve"),
("proc_exit", "sys_enter_exit_group"),
("tls_write", "sys_enter_write"),
("tls_sendto", "sys_enter_sendto"),
("connect", "sys_enter_connect"),
Expand Down Expand Up @@ -91,6 +90,14 @@ async fn main() -> anyhow::Result<()> {
}
}
}
// proc_exit is a do_exit kprobe (not a tracepoint): do_exit fires for EVERY task exit,
// including signal-kills (crash / OOM) that sys_enter_exit_group never sees.
match attach_kprobe(&mut ebpf, "proc_exit", "do_exit") {
Ok(()) => attached += 1,
Err(e) => {
tracing::warn!(error = %e, "proc_exit (do_exit kprobe) failed — exit signals unavailable")
}
}
if attached == 0 {
anyhow::bail!("no eBPF probes could be attached");
}
Expand Down Expand Up @@ -132,7 +139,7 @@ async fn main() -> anyhow::Result<()> {
let classifier = SniClassifier;
let resolver = KubeResolver; // cgroup→pod in k8s; falls back to comm on bare hosts
// (pid,fd) -> peer, populated by connect, read by the TLS probe to fuse provider+peer.
let mut peers: HashMap<u64, IpAddr> = HashMap::new();
let mut peers: HashMap<u64, (IpAddr, u16)> = HashMap::new();
// (pid,fd) -> (sni, provider, peer): recorded at ClientHello, read when the socket
// closes (the in-kernel LlmEvent) to build the metric-bearing LlmCall.
let mut llm_meta: HashMap<u64, (Option<String>, Option<Provider>, IpAddr)> = HashMap::new();
Expand Down Expand Up @@ -232,6 +239,7 @@ async fn main() -> anyhow::Result<()> {
event: AgentEvent::ToolExec {
pid: ev.pid,
ppid: read_ppid(ev.pid),
uid: ev.uid,
argv: argv_of(&ev),
cwd: read_cwd(ev.pid),
},
Expand All @@ -246,6 +254,7 @@ async fn main() -> anyhow::Result<()> {
event: AgentEvent::ProcessExit {
pid: ev.pid,
exit_code: ev.exit_code,
signal: ev.signal,
},
});
}
Expand All @@ -257,14 +266,15 @@ async fn main() -> anyhow::Result<()> {
if peers.len() > 8192 {
peers.clear(); // ponytail: crude cap; LRU if it ever matters
}
peers.insert(sock_key(ev.pid, ev.fd), peer);
peers.insert(sock_key(ev.pid, ev.fd), (peer, ev.port));
emit(exporter.as_ref(), &mut stats, EnrichedEvent {
identity: identity_for(&resolver, ev.pid, &ev.comm),
provider: None,
event: AgentEvent::Egress {
pid: ev.pid,
sni: None,
peer,
port: ev.port,
bytes: 0,
},
});
Expand All @@ -275,10 +285,10 @@ async fn main() -> anyhow::Result<()> {
let len = (ev.len as usize).min(ev.data.len());
let sni = parse_sni(&ev.data[..len]);
// Correlated peer for this socket (the LLM endpoint).
let peer = peers
let (peer, port) = peers
.get(&sock_key(ev.pid, ev.fd))
.copied()
.unwrap_or(UNKNOWN_PEER);
.unwrap_or((UNKNOWN_PEER, 0));
let provider =
sni.as_deref().and_then(|h| classifier.classify(Some(h), peer));
// Remember the call so the close event can build a metric-bearing
Expand All @@ -294,6 +304,7 @@ async fn main() -> anyhow::Result<()> {
pid: ev.pid,
sni,
peer,
port,
bytes: ev.len as u64,
},
});
Expand Down Expand Up @@ -364,8 +375,25 @@ async fn main() -> anyhow::Result<()> {
let len = (ev.len as usize).min(ev.data.len());
let content = String::from_utf8_lossy(&ev.data[..len]).into_owned();
if !content.is_empty() {
let identity = identity_for(&resolver, ev.pid, &ev.comm);
// Structured LLM telemetry (model/tokens) alongside the raw content.
if let Some((model, prompt_tokens, completion_tokens)) =
parse_llm_meta(&content)
{
emit(exporter.as_ref(), &mut stats, EnrichedEvent {
identity: identity.clone(),
provider: None,
event: AgentEvent::LlmApi {
pid: ev.pid,
is_request: ev.is_read == 0,
model,
prompt_tokens,
completion_tokens,
},
});
}
emit(exporter.as_ref(), &mut stats, EnrichedEvent {
identity: identity_for(&resolver, ev.pid, &ev.comm),
identity,
provider: None,
event: AgentEvent::SslContent {
pid: ev.pid,
Expand Down Expand Up @@ -406,6 +434,17 @@ fn attach(ebpf: &mut Ebpf, prog: &str, category: &str, name: &str) -> anyhow::Re
Ok(())
}

fn attach_kprobe(ebpf: &mut Ebpf, prog: &str, sym: &str) -> anyhow::Result<()> {
let p: &mut KProbe = ebpf
.program_mut(prog)
.with_context(|| format!("`{prog}` program not found"))?
.try_into()?;
p.load()?;
p.attach(sym, 0)
.with_context(|| format!("attach kprobe {sym}"))?;
Ok(())
}

fn attach_uprobe(ebpf: &mut Ebpf, prog: &str, sym: &str, target: &str) -> anyhow::Result<()> {
let p: &mut UProbe = ebpf
.program_mut(prog)
Expand Down Expand Up @@ -486,6 +525,7 @@ fn emit(exporter: &dyn Exporter, stats: &mut Stats, ev: EnrichedEvent) {
AgentEvent::FileDelete { .. } => stats.file += 1,
AgentEvent::LlmCall { .. } => stats.llm += 1,
AgentEvent::SslContent { .. } => stats.ssl += 1,
AgentEvent::LlmApi { .. } => stats.llm += 1,
}
exporter.export(&ev);
}
Expand Down Expand Up @@ -560,9 +600,33 @@ fn parse_dns_qname(buf: &[u8]) -> Option<String> {
(!name.is_empty()).then_some(name)
}

/// Best-effort LLM-API fields from captured TLS plaintext: `"model"` from a request body, token
/// `usage` from a response. None if absent (not an LLM call, or the bytes weren't captured).
/// Consumes untrusted plaintext — every index is bounds-checked, must never panic.
fn parse_llm_meta(s: &str) -> Option<(Option<String>, Option<u32>, Option<u32>)> {
let model = json_str_after(s, "\"model\"");
let pt = json_num_after(s, "\"prompt_tokens\"");
let ct = json_num_after(s, "\"completion_tokens\"");
(model.is_some() || pt.is_some() || ct.is_some()).then_some((model, pt, ct))
}

fn json_str_after(s: &str, key: &str) -> Option<String> {
let rest = &s[s.find(key)? + key.len()..]; // find() ≤ len, +key.len() ≤ len → in-bounds
let body = &rest[rest.find('"')? + 1..]; // past the value's opening quote
Some(body[..body.find('"')?].to_owned())
}

fn json_num_after(s: &str, key: &str) -> Option<u32> {
let rest = s[s.find(key)? + key.len()..].trim_start_matches([':', ' ', '\t']);
let end = rest
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(rest.len());
rest.get(..end)?.parse().ok()
}

#[cfg(test)]
mod tests {
use super::{parse_dns_qname, parse_sni};
use super::{parse_dns_qname, parse_llm_meta, parse_sni};

#[test]
fn parses_sni_from_minimal_clienthello() {
Expand All @@ -589,6 +653,16 @@ mod tests {
assert_eq!(parse_sni(&[]), None);
}

#[test]
fn parse_llm_meta_extracts_model_and_tokens() {
let req = r#"POST /v1/chat/completions HTTP/1.1 ... {"model":"gpt-4o","messages":[{"role":"user","content":"hi"}]}"#;
assert_eq!(parse_llm_meta(req).unwrap().0.as_deref(), Some("gpt-4o"));
let resp = r#"{"id":"x","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":34}}"#;
let (_, pt, ct) = parse_llm_meta(resp).unwrap();
assert_eq!((pt, ct), (Some(12), Some(34)));
assert!(parse_llm_meta("just plaintext, no json fields here").is_none());
}

#[test]
fn parses_dns_query_name() {
let mut q = vec![0u8; 12]; // header
Expand Down
2 changes: 1 addition & 1 deletion a3s-observer-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "a3s-observer-common"
version = "0.9.3"
version = "0.10.0"
edition = "2021"
license = "MIT"
description = "Shared no_std types crossing the eBPF <-> userspace boundary for a3s-observer."
Expand Down
6 changes: 4 additions & 2 deletions a3s-observer-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ pub struct ExecEvent {

/// A process exit (`sys_enter_exit_group`) — the other end of the tool lifecycle, carrying the
/// exit status so tool *outcomes* are visible (did the command succeed?), not just that it ran.
/// Catches clean exits (the C runtime's `exit()` calls `exit_group`); signal kills don't.
/// Captured via a `do_exit` kprobe, so it catches EVERY exit — clean exits and signal-kills
/// (crash / SIGKILL / OOM) alike.
#[repr(C)]
#[derive(Clone, Copy)]
pub struct ExitEvent {
pub pid: u32,
pub exit_code: u32, // exit_group status; the low byte is the code passed to exit()
pub exit_code: u32, // exit() status (0 when terminated by a signal)
pub signal: u32, // terminating signal, 0 = clean exit (9 SIGKILL/OOM, 11 SIGSEGV crash, …)
pub comm: [u8; 16],
}

Expand Down
2 changes: 1 addition & 1 deletion a3s-observer-ebpf/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "a3s-observer-ebpf"
version = "0.9.3"
version = "0.10.0"
edition = "2021"
license = "MIT"
publish = false
Expand Down
26 changes: 18 additions & 8 deletions a3s-observer-ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aya_ebpf::{
bpf_get_current_comm, bpf_get_current_pid_tgid, bpf_get_current_uid_gid, bpf_ktime_get_ns,
bpf_probe_read_user_buf, bpf_probe_read_user_str_bytes,
},
macros::{cgroup_sock_addr, map, tracepoint, uprobe, uretprobe},
macros::{cgroup_sock_addr, kprobe, map, tracepoint, uprobe, uretprobe},
maps::{ring_buf::RingBufEntry, HashMap, PerCpuArray, RingBuf},
programs::{ProbeContext, RetProbeContext, SockAddrContext, TracePointContext},
};
Expand Down Expand Up @@ -148,23 +148,33 @@ fn try_exec(ctx: &TracePointContext) -> Result<u32, i64> {
Ok(0)
}

// ---- process exit (sys_enter_exit_group) — the tool's outcome / exit code ----
// ---- process exit (do_exit kprobe) — the tool's outcome: exit code AND terminating signal ----

#[tracepoint]
pub fn proc_exit(ctx: TracePointContext) -> u32 {
#[kprobe]
pub fn proc_exit(ctx: ProbeContext) -> u32 {
try_proc_exit(&ctx).unwrap_or(0)
}

fn try_proc_exit(ctx: &TracePointContext) -> Result<u32, i64> {
// do_exit(long code) fires for EVERY task exit, including signal-kills (SIGSEGV crash, SIGKILL /
// OOM) that never call exit_group. `code` is the wait-status: low 7 bits = terminating signal,
// (code >> 8) & 0xff = the exit() status.
fn try_proc_exit(ctx: &ProbeContext) -> Result<u32, i64> {
// do_exit fires per-THREAD; emit once per PROCESS by gating on the thread-group leader
// (tgid == task pid). Without this a multithreaded agent emits N duplicate ProcessExit/pid.
let id = bpf_get_current_pid_tgid();
if (id >> 32) as u32 != id as u32 {
return Ok(0);
}
let code: u64 = ctx.arg(0).unwrap_or(0);
let Some(mut entry) = reserve_or_drop::<ExitEvent>(&EXIT_EVENTS) else {
return Ok(0);
};
let ev = entry.as_mut_ptr();
unsafe {
(*ev).pid = (bpf_get_current_pid_tgid() >> 32) as u32;
(*ev).pid = (id >> 32) as u32;
(*ev).comm = bpf_get_current_comm().unwrap_or_default();
// sys_enter_exit_group: `long error_code` at offset 16 (low byte = the exit() code).
(*ev).exit_code = ctx.read_at::<u64>(16).unwrap_or(0) as u32;
(*ev).exit_code = ((code >> 8) & 0xff) as u32;
(*ev).signal = (code & 0x7f) as u32; // & 0x7f intentionally drops the 0x80 core-dump bit
}
entry.submit(0);
Ok(0)
Expand Down
Loading
Loading