feat(hooks): add UnixSocketHookSink for real-time event streaming#2333
feat(hooks): add UnixSocketHookSink for real-time event streaming#2333lihuan215 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a UnixSocketHookSink to send lifecycle events over a Unix domain socket for external monitoring, along with configuration support via a new [hooks] table. While the feature is useful, several issues must be addressed: a critical deserialization bug caused by #[serde(flatten)] that breaks configuration parsing, a security risk from using a default socket path in /tmp, performance overhead from reconnecting to the socket on every event emission, and a test failure due to a missing parent directory creation before binding.
| #[serde(flatten)] | ||
| /// Hook sink configuration. When absent, the runtime defaults to | ||
| /// stdout + JSONL sinks only (current behaviour). | ||
| #[serde(default)] | ||
| pub hooks: Option<HooksToml>, |
There was a problem hiding this comment.
Using #[serde(flatten)] on the hooks field causes Serde to look for the HooksToml fields (like sink, socket_path, etc.) at the root level of the configuration instead of nesting them under a [hooks] table. This completely breaks parsing when the configuration is written as [hooks] in config.toml (as documented in the PR description), resulting in config.hooks being None and the sink never being initialized. Removing #[serde(flatten)] resolves this issue.
| #[serde(flatten)] | |
| /// Hook sink configuration. When absent, the runtime defaults to | |
| /// stdout + JSONL sinks only (current behaviour). | |
| #[serde(default)] | |
| pub hooks: Option<HooksToml>, | |
| /// Hook sink configuration. When absent, the runtime defaults to | |
| /// stdout + JSONL sinks only (current behaviour). | |
| #[serde(default)] | |
| pub hooks: Option<HooksToml>, |
| let socket_path = hooks_cfg.socket_path.clone() | ||
| .unwrap_or_else(|| "/tmp/codewhale-island.sock".to_string()); |
There was a problem hiding this comment.
Using a hardcoded socket path in /tmp (which is world-writable on Unix systems) poses a security risk of Socket Hijacking. A malicious local user could pre-create /tmp/codewhale-island.sock with permissive permissions or listen on it, allowing them to intercept sensitive lifecycle events (which contain LLM prompts, responses, and tool execution payloads). Placing the default socket path inside the user's home directory (e.g., ~/.codewhale/) ensures appropriate restricted permissions.
let socket_path = hooks_cfg.socket_path.clone()
.unwrap_or_else(|| {
codewhale_config::codewhale_home()
.map(|p| p.join("codewhale-island.sock").to_string_lossy().into_owned())
.unwrap_or_else(|_| "/tmp/codewhale-island.sock".to_string())
});| let mut stream = match tokio::net::UnixStream::connect(&self.path).await { | ||
| Ok(s) => s, | ||
| Err(_) => return Ok(()), // listener not running, skip silently | ||
| }; |
There was a problem hiding this comment.
Connecting to the Unix socket on every single emit call introduces significant performance overhead, especially for high-frequency events like ResponseDelta (which is triggered for every single token generated by the LLM). This can lead to high CPU usage and potential socket descriptor exhaustion. Consider caching/reusing the connection using a tokio::sync::Mutex<Option<tokio::net::UnixStream>> or utilizing a channel-based background worker to handle persistent connection management.
| let socket_path = unique_temp_dir("unix_sink").join("test.sock"); | ||
| let _ = std::fs::remove_file(&socket_path); | ||
|
|
||
| let listener = UnixListener::bind(&socket_path).expect("bind"); |
There was a problem hiding this comment.
The test unix_socket_sink_sends_event_to_listener will fail because unique_temp_dir only generates a path but does not actually create the directory. UnixListener::bind expects the parent directory of the socket path to already exist, otherwise it returns a NotFound error. Creating the parent directory before binding resolves this issue.
| let socket_path = unique_temp_dir("unix_sink").join("test.sock"); | |
| let _ = std::fs::remove_file(&socket_path); | |
| let listener = UnixListener::bind(&socket_path).expect("bind"); | |
| let socket_dir = unique_temp_dir("unix_sink"); | |
| std::fs::create_dir_all(&socket_dir).expect("create socket dir"); | |
| let socket_path = socket_dir.join("test.sock"); | |
| let _ = std::fs::remove_file(&socket_path); | |
| let listener = UnixListener::bind(&socket_path).expect("bind"); |
82ad6df to
dacd380
Compare
…external monitors
dacd380 to
30a09b4
Compare
|
wow - thank you. will work on getting this in. super appreciative |
Summary
Adds a new
UnixSocketHookSinkto the hooks crate, enabling real-time lifecycle event streaming over a Unix domain socket. This extends the hook system beyond stdout / JSONL file / webhook to support low-latency local IPC — useful for external monitoring dashboards, notification systems, and tool integration without polling files.Motivation
The current hook sinks all have trade-offs:
StdoutHookSinkJsonlHookSinkWebhookHookSinkNone of them offer local, push-based, near-zero-overhead event delivery. A Unix socket fills this gap — connect, receive JSON lines, disconnect. No filesystem polling. No HTTP server. No mixed stdout.
What this PR does
crates/hooks/src/lib.rs: AddsUnixSocketHookSinkimplementingHookSinkemit(stateless){"at": "...", "event": {...}}\nJSON lines#[cfg(unix)]-guarded; no-op on non-Unix platformscrates/config/src/lib.rs: Adds[hooks]config section (HooksToml)crates/app-server/src/lib.rs: Wires up the sink based on config[hooks] sink = "unix-socket", addsUnixSocketHookSinkto the dispatcher/tmp/codewhale-island.sockDesign Decisions
{"at": "...", "event": {...}}envelope as JsonlHookSink and WebhookHookSink.[hooks]to config.toml.Future Possibilities
This sink opens the door to:
ApprovalLifecycleevents)Greptile Summary
This PR adds a
UnixSocketHookSinkthat delivers lifecycle events as JSON lines over a Unix domain socket, wired up via a new[hooks]TOML config section. The sink uses a connect-per-event model and silently drops events when no listener is present.crates/hooks/src/lib.rs: NewUnixSocketHookSinkwith#[cfg(unix)]-guardedemit; connection errors are silently swallowed but write errors (e.g. broken-pipe when a listener crashes mid-write) are propagated, inconsistent with the stated best-effort design.crates/config/src/lib.rs: AddsHooksTomland ahooksfield toConfigToml;jsonl_pathandwebhook_urlfields are declared but never consumed byapp-server.crates/app-server/src/lib.rs: Reads the new config and conditionally adds the sink; only the exact string"unix-socket"is recognised — all other sink values are silently ignored.Confidence Score: 3/5
The new sink code is safe to run but has several open correctness issues across all three changed files that should be resolved before merging.
The write-error propagation in
UnixSocketHookSinkis inconsistent with the silent-skip contract. The test helperunique_temp_dirreturns a non-existent directory path, causingUnixListener::bindto fail with ENOENT. The config struct declaresjsonl_pathandwebhook_urlthat are parsed but never acted on, and anysinkvalue other than"unix-socket"is silently ignored inapp-server. Together these mean the feature doesn't fully behave as documented and the new socket test is broken on most environments.All three changed files need attention:
crates/hooks/src/lib.rsfor the error-handling inconsistency and test directory bug,crates/config/src/lib.rsfor the unused declared fields, andcrates/app-server/src/lib.rsfor the silent-ignore behaviour on unrecognised sink values.Important Files Changed
UnixSocketHookSinkwith connect-per-event design and#[cfg(unix)]-guarded emit; write errors are propagated while connection errors are silently swallowed (inconsistent), and the new test usesunique_temp_dirwhose directory is never created beforeUnixListener::bind.HooksTomlstruct andhooksfield toConfigToml; serde attribute placement keeps#[serde(flatten)]correctly onextras, butjsonl_pathandwebhook_urlfields are declared yet never consumed byapp-server.UnixSocketHookSinkfrom config; only the"unix-socket"string is handled — any other configured value is silently ignored with no warning to the user.Sequence Diagram
sequenceDiagram participant Engine as Engine (Runtime) participant Dispatcher as HookDispatcher participant Sink as UnixSocketHookSink participant OS as OS / Unix Socket participant Listener as External Listener Engine->>Dispatcher: emit(HookEvent) loop for each sink Dispatcher->>Sink: emit(event) Sink->>OS: UnixStream::connect(path) alt Listener not running OS-->>Sink: Err (ENOENT / ECONNREFUSED) Sink-->>Dispatcher: Ok(()) [silent skip] else Listener running OS-->>Sink: Ok(stream) Sink->>Sink: serde_json::to_string(payload) Sink->>OS: write_all(json_line + "\n") alt Write succeeds OS->>Listener: JSON line delivered Sink-->>Dispatcher: Ok(()) else Write fails (broken pipe) OS-->>Sink: Err Sink-->>Dispatcher: Err [propagated — inconsistent with silent-skip design] end Sink->>OS: stream dropped → FIN end Dispatcher->>Dispatcher: log error and continue (if Err) endReviews (2): Last reviewed commit: "feat(hooks): add UnixSocketHookSink for ..." | Re-trigger Greptile