From 6a4f56794e141ac05ca7088dbe796f230f64bc63 Mon Sep 17 00:00:00 2001 From: John Morrissey <544926+tachyon-beep@users.noreply.github.com> Date: Sat, 27 Jun 2026 12:49:55 +1000 Subject: [PATCH 1/2] feat(churn): consume Warpline's frozen churn read to light up high_churn/recently_changed The entity_high_churn_list / entity_recent_change_list MCP surfaces were dead-by-design: loomweave does not populate git_churn_count in v1.0 and, by the loomweave<->warpline seam HARD RULE, retains no cross-run history. This wires the read-time join to Warpline's FROZEN warpline_entity_churn_count_get (warpline.entity_churn_count.v1, 2026-06-13 interface lock SS1A / GV-LW-2). Read-only / enrich-only / dependency-sink: - New WarplineLookup trait (single method, the churn read) + WarplineMcpClient (MCP-stdio subprocess, mirrors the Filigree consumer pattern), injected as an Option> defaulting to None. Lives in loomweave-federation. - Candidate universe = the entity catalogue (new entities_for_churn_candidates query), NOT the empty git_churn_count scan; warpline holds the counts. Scope-filter BEFORE the warpline call, then rank the scoped set by the returned counts. SEI-keyed refs with locator fallback (never drops a candidate). One bounded call, joined at read time, retained nowhere. - Honest-degrade (lock SS1C): warpline disabled/unreachable -> honest-empty with a warpline-named missing-signal note + churn_source provenance; never empty-as-clean, never a hard error breaking the core flow. GV-LW-2 is an executable test: the full frozen envelope fixture parsed through the real parse path, driven via an injected fake (no live MCP call from the hub context). Both honest-degrade paths (disabled, unreachable) are tested. Blast radius: READS the frozen warpline_entity_churn_count_get contract only; mints no new sibling obligation (the producer already ships). Live cross-member validation against a real Warpline, deep-pagination beyond warpline's limit, and the >CHURN_SCAN_CAP single-frame request bound are tracked follow-ups. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/loomweave-cli/src/serve.rs | 14 + crates/loomweave-federation/src/config.rs | 46 ++ crates/loomweave-federation/src/lib.rs | 1 + crates/loomweave-federation/src/warpline.rs | 569 ++++++++++++++++++ .../loomweave-mcp/src/catalogue/shortcuts.rs | 395 +++++++++--- crates/loomweave-mcp/src/lib.rs | 25 +- crates/loomweave-mcp/src/warpline.rs | 1 + crates/loomweave-mcp/tests/catalogue_tools.rs | 41 +- .../tests/warpline_churn_consumer.rs | 352 +++++++++++ crates/loomweave-storage/src/lib.rs | 8 +- crates/loomweave-storage/src/query.rs | 19 + 11 files changed, 1386 insertions(+), 85 deletions(-) create mode 100644 crates/loomweave-federation/src/warpline.rs create mode 100644 crates/loomweave-mcp/src/warpline.rs create mode 100644 crates/loomweave-mcp/tests/warpline_churn_consumer.rs diff --git a/crates/loomweave-cli/src/serve.rs b/crates/loomweave-cli/src/serve.rs index 87356a82..d5378606 100644 --- a/crates/loomweave-cli/src/serve.rs +++ b/crates/loomweave-cli/src/serve.rs @@ -11,6 +11,7 @@ use loomweave_federation::config::{ select_provider_with_env, }; use loomweave_federation::filigree::FiligreeHttpClient; +use loomweave_federation::warpline::WarplineMcpClient; use loomweave_llm::{ ApiEmbeddingProvider, ApiEmbeddingProviderConfig, ClaudeCliProvider, ClaudeCliProviderConfig, CodexCliProvider, CodexCliProviderConfig, EmbeddingProvider, EmbeddingProviderError, @@ -91,6 +92,12 @@ pub fn run(path: &Path, config_path: Option<&Path>) -> Result<()> { ) .context("build Filigree HTTP client")?; + // Read-only Warpline churn consumer for the high-churn / recently-changed + // surfaces. `None` when disabled (the default) — the surfaces degrade + // honestly. Enrich-only, dependency-sink: nothing flows loomweave→warpline. + let warpline_client = + WarplineMcpClient::from_config(&config.integrations.warpline, Some(&project_root)); + let diagnostics = loomweave_mcp::DiagnosticsContext { llm: llm_diagnostics, filigree: filigree_resolution, @@ -127,6 +134,7 @@ pub fn run(path: &Path, config_path: Option<&Path>) -> Result<()> { llm_provider, semantic_search_state(&config.semantic_search, embedding_provider), filigree_client, + warpline_client, diagnostics, loomweave_mcp::McpToolPolicy { enable_write_tools: config.serve.mcp.enable_write_tools, @@ -199,6 +207,7 @@ fn spawn_mcp_stdio( llm_provider: Option>, semantic_search: Option, filigree_client: Option, + warpline_client: Option, diagnostics: loomweave_mcp::DiagnosticsContext, tool_policy: loomweave_mcp::McpToolPolicy, analyze_config_path: Option, @@ -215,6 +224,7 @@ fn spawn_mcp_stdio( llm_provider, semantic_search, filigree_client, + warpline_client, diagnostics, tool_policy, analyze_config_path, @@ -234,6 +244,7 @@ fn run_mcp_stdio( llm_provider: Option>, semantic_search: Option, filigree_client: Option, + warpline_client: Option, diagnostics: loomweave_mcp::DiagnosticsContext, tool_policy: loomweave_mcp::McpToolPolicy, analyze_config_path: Option, @@ -271,6 +282,9 @@ fn run_mcp_stdio( if let Some(client) = filigree_client { state = state.with_filigree_client(Arc::new(client)); } + if let Some(client) = warpline_client { + state = state.with_warpline_client(Arc::new(client)); + } state = state.with_diagnostics(diagnostics); let serve_result = loomweave_mcp::serve_stdio_with_state_on_runtime( diff --git a/crates/loomweave-federation/src/config.rs b/crates/loomweave-federation/src/config.rs index 36cb6eab..92917e50 100644 --- a/crates/loomweave-federation/src/config.rs +++ b/crates/loomweave-federation/src/config.rs @@ -550,6 +550,14 @@ impl ClaudePermissionMode { #[serde(default, deny_unknown_fields)] pub struct IntegrationsConfig { pub filigree: FiligreeConfig, + /// Warpline (the federation's temporal/change authority) churn-count read, + /// consumed at read time by `entity_high_churn_list` / + /// `entity_recent_change_list`. Read-only, enrich-only, dependency-sink: + /// loomweave never stores a warpline fact (the seam's HARD RULE — loomweave + /// retains no cross-run history; see the 2026-06-13 warpline interface lock + /// §1, §5). Default disabled — the churn surfaces stay honest-empty with a + /// missing-signal note until an operator opts in. + pub warpline: WarplineConfig, } #[derive(Debug, Clone, PartialEq, Default, Deserialize)] @@ -754,6 +762,44 @@ impl Default for FiligreeConfig { } } +/// Read-time consumption of Warpline's FROZEN churn-count read +/// (`warpline_entity_churn_count_get`, `warpline.entity_churn_count.v1`). This +/// is a *read-only* seam: loomweave asks warpline for per-entity change counts +/// to rank `entity_high_churn_list` / `entity_recent_change_list`, joins them at +/// read time, and retains NOTHING (the loomweave↔warpline HARD RULE — loomweave +/// holds no cross-run history). There is deliberately NO write/emit flag here: +/// unlike the Filigree seam, nothing flows loomweave→warpline. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct WarplineConfig { + /// Whether the churn surfaces consult warpline. Default `false`: the + /// surfaces stay honest-empty (with a missing-signal note naming warpline) + /// until an operator opts in. A missing/unreachable warpline with this + /// `true` degrades the same way — never an error, never empty-as-clean. + pub enabled: bool, + /// Actor identity carried on the warpline call. Warpline is an MCP-stdio + /// member (no HTTP read API), so it is launched as a subprocess and driven + /// over its MCP stdio transport — the same mechanism the Filigree MCP-tool + /// calls use. The command is resolved at call time (env override + /// `LOOMWEAVE_WARPLINE_MCP_COMMAND`, else the `warpline mcp` shim). + /// + /// There is deliberately NO `timeout_seconds` knob: warpline has no HTTP + /// path, the subprocess round-trip is short-lived, and a per-call timeout is + /// not yet wired — advertising one would promise a guarantee not delivered + /// (input-affordances-are-promises). Subprocess-hang handling is a tracked + /// follow-up. + pub actor: String, +} + +impl Default for WarplineConfig { + fn default() -> Self { + Self { + enabled: false, + actor: "loomweave-mcp".to_owned(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProviderSelection { Disabled, diff --git a/crates/loomweave-federation/src/lib.rs b/crates/loomweave-federation/src/lib.rs index 71da905a..4e229208 100644 --- a/crates/loomweave-federation/src/lib.rs +++ b/crates/loomweave-federation/src/lib.rs @@ -6,3 +6,4 @@ pub mod filigree_url; pub mod loomweave_port; pub mod loomweave_url; pub mod scan_results; +pub mod warpline; diff --git a/crates/loomweave-federation/src/warpline.rs b/crates/loomweave-federation/src/warpline.rs new file mode 100644 index 00000000..6da77591 --- /dev/null +++ b/crates/loomweave-federation/src/warpline.rs @@ -0,0 +1,569 @@ +//! Warpline churn-count consumer for Loomweave. +//! +//! Loomweave's `entity_high_churn_list` / `entity_recent_change_list` MCP +//! surfaces were dead-by-design: loomweave does not populate `git_churn_count` +//! in v1.0 and, by the seam's HARD RULE, retains no cross-run history. Warpline +//! is the federation's temporal authority that *does* hold per-entity change +//! counts. This module is the read-time consumer of Warpline's FROZEN churn +//! read `warpline_entity_churn_count_get` (`warpline.entity_churn_count.v1`, +//! 2026-06-13 interface lock §1A / GV-LW-2). +//! +//! Discipline (all load-bearing, from the lock): +//! - **READ-ONLY / DEPENDENCY-SINK.** Nothing flows loomweave→warpline here. +//! loomweave asks for counts, joins at read time, and stores NOTHING — no new +//! table, no retained warpline fact (§5 HARD RULE). +//! - **ENRICH-ONLY HONEST-DEGRADE.** Warpline absent/disabled/unreachable → +//! the consumer reports honest-unavailable with a reason; it never breaks +//! loomweave's core flow and never reads absence as a clean/empty answer +//! (§1C, §2 ENRICH-ONLY). +//! - **SEI-KEYED, LOCATOR FALLBACK.** Refs are keyed on the SEI when loomweave +//! has resolved one, else on the entity locator (the entity id). A +//! never-observed ref returns `churn_count: 0` from warpline — a real, +//! complete answer, not an error (lock §1A "Keying"). +//! +//! Transport: Warpline is an MCP-stdio member (no HTTP read API), so it is +//! launched as a subprocess and driven over MCP stdio — the same mechanism the +//! Filigree MCP-tool calls use (`filigree::run_mcp_tool`). Kept self-contained +//! here rather than sharing filigree's private frame helpers. + +use std::collections::HashMap; +use std::io::{BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +use loomweave_core::plugin::{ContentLengthCeiling, Frame, read_frame, write_frame}; +use serde::Deserialize; +use thiserror::Error; + +use crate::config::WarplineConfig; + +/// The endorsed FROZEN tool name (lock §1A). The short shim is `churn`. +pub const WARPLINE_CHURN_TOOL: &str = "warpline_entity_churn_count_get"; +/// The frozen contract URI carried in warpline's success envelope `schema`. +pub const WARPLINE_CHURN_SCHEMA: &str = "warpline.entity_churn_count.v1"; + +/// A single entity ref Loomweave sends to warpline. The frozen ref shape is +/// `{kind, value}` (lock "Entity references and SEI keying"). Loomweave emits +/// `kind: "sei"` when it holds a resolved SEI, else `kind: "locator"` carrying +/// the entity id (which *is* a loomweave locator). +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +pub struct WarplineEntityRef { + pub kind: &'static str, + pub value: String, +} + +impl WarplineEntityRef { + /// SEI ref when `sei` is present and non-blank, else a locator ref keyed on + /// the entity id. Never drops a candidate — an unresolved entity is sent as + /// a locator and warpline answers `churn_count: 0` if it has never observed + /// it. + #[must_use] + pub fn for_entity(entity_id: &str, sei: Option<&str>) -> Self { + match sei.map(str::trim).filter(|s| !s.is_empty()) { + Some(sei) => Self { + kind: "sei", + value: sei.to_owned(), + }, + None => Self { + kind: "locator", + value: entity_id.to_owned(), + }, + } + } +} + +/// One `data.items[]` row from the frozen `warpline.entity_churn_count.v1` +/// output (lock §1A). Unknown fields are ignored so warpline can grow the row +/// without breaking this read. +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ChurnItem { + /// Echoed entity keys. Carries both `sei` (null when warpline has not + /// resolved one) and `locator`. + #[serde(default)] + pub entity: ChurnEntity, + /// Count of change events. A never-observed ref is `0` (not omitted, not an + /// error) — the GV-LW-2 invariant. + pub churn_count: i64, + #[serde(default)] + pub first_changed_at: Option, + #[serde(default)] + pub last_changed_at: Option, + #[serde(default)] + pub last_actor: Option, +} + +#[derive(Debug, Clone, PartialEq, Default, Deserialize)] +pub struct ChurnEntity { + #[serde(default)] + pub sei: Option, + #[serde(default)] + pub locator: Option, +} + +/// The `data` payload of the frozen churn envelope (`data.items` is the part +/// loomweave joins on). +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ChurnData { + #[serde(default)] + pub items: Vec, +} + +/// The full FROZEN success envelope warpline returns +/// (`{schema, ok, query, data, warnings, …}`). Loomweave reads `data.items`; +/// the rest is tolerated so the parse pins the *wire* shape, not a convenient +/// subset (GV-LW-2 is asserted against this envelope). +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ChurnCountResponse { + #[serde(default)] + pub schema: Option, + #[serde(default)] + pub ok: Option, + pub data: ChurnData, +} + +impl ChurnCountResponse { + /// Index the returned counts by both SEI and locator so the caller can look + /// up a count for an entity regardless of which key it was sent under. The + /// value is the full [`ChurnItem`] (count + first/last/actor) — the recency + /// surface needs `last_changed_at`. + #[must_use] + pub fn index_by_key(&self) -> HashMap { + let mut by_key = HashMap::new(); + for item in &self.data.items { + if let Some(sei) = item.entity.sei.as_deref().filter(|s| !s.is_empty()) { + by_key.insert(sei.to_owned(), item); + } + if let Some(locator) = item.entity.locator.as_deref().filter(|s| !s.is_empty()) { + by_key.insert(locator.to_owned(), item); + } + } + by_key + } +} + +/// Parse the FROZEN churn envelope body. Pins the wire contract: a body that is +/// not `{…, "data": {"items": [...]}, …}` is a contract error, surfaced so the +/// caller degrades the surface to honest-unavailable rather than fabricating an +/// empty ranking. +/// +/// # Errors +/// Returns [`WarplineContractError`] when the body is not valid frozen-envelope +/// JSON. +pub fn parse_churn_count_response(body: &str) -> Result { + serde_json::from_str(body).map_err(WarplineContractError::from) +} + +#[derive(Debug, Error)] +pub enum WarplineContractError { + #[error("invalid Warpline churn response: {0}")] + InvalidResponse(#[from] serde_json::Error), +} + +#[derive(Debug, Error)] +pub enum WarplineClientError { + #[error("run Warpline MCP tool {tool}: {message}")] + McpTool { tool: String, message: String }, + + /// Warpline returned its FROZEN `warpline.error.v1` error envelope (e.g. + /// `invalid_changed_refs` for an unrecognised ref shape). + #[error("Warpline returned an error for {tool}: {message}")] + WarplineError { tool: String, message: String }, + + #[error(transparent)] + Contract(#[from] WarplineContractError), +} + +/// The read-only Warpline seam Loomweave depends on. ONE method — the churn +/// read. No timeline/blast-radius methods: an unused method would be +/// dead-by-design (the very thing this seam exists to cure). The default impl +/// reports the read unavailable so a test double / read-only deployment opts in +/// explicitly and cannot accidentally pretend a count was returned. +pub trait WarplineLookup: Send + Sync { + /// Per-entity change counts for `entity_refs` over an optional `window`, + /// keyed by SEI (or locator). `window` is the frozen + /// `{since, until, rev_range}` object; `None` means the all-time count. + /// Returns the full frozen envelope so the caller can index counts by key. + /// + /// # Errors + /// Returns [`WarplineClientError`] on transport failure, a warpline error + /// envelope, or an unparseable body — every one of which the caller treats + /// as honest-unavailable, never as a clean/empty ranking. + fn entity_churn_counts( + &self, + _entity_refs: &[WarplineEntityRef], + _window: Option<&serde_json::Value>, + ) -> Result { + Err(WarplineClientError::McpTool { + tool: WARPLINE_CHURN_TOOL.to_owned(), + message: "Warpline churn read is unavailable (no client configured)".to_owned(), + }) + } +} + +/// MCP-stdio client for Warpline's churn read. Construction is gated on +/// `config.enabled`; an absent client (`None`) is the honest-degrade default. +#[derive(Debug, Clone)] +pub struct WarplineMcpClient { + actor: String, + project_root: Option, +} + +impl WarplineMcpClient { + /// Build a client from config, or `None` when the seam is disabled. The + /// returned client is wired to reach warpline as a subprocess rooted at + /// `project_root`. + #[must_use] + pub fn from_config(config: &WarplineConfig, project_root: Option<&Path>) -> Option { + if !config.enabled { + return None; + } + Some(Self { + actor: config.actor.clone(), + project_root: project_root.map(Path::to_path_buf), + }) + } + + fn run_churn_tool( + &self, + arguments: &serde_json::Value, + ) -> Result { + let tool = WARPLINE_CHURN_TOOL; + let (program, args) = resolve_warpline_mcp_command(self.project_root.as_deref()); + let mut child = Command::new(&program) + .args(&args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir( + self.project_root + .as_deref() + .unwrap_or_else(|| Path::new(".")), + ) + .spawn() + .map_err(|err| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("spawn {program}: {err}"), + })?; + let mut stdin = child + .stdin + .take() + .ok_or_else(|| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: "child stdin unavailable".to_owned(), + })?; + let stdout = child + .stdout + .take() + .ok_or_else(|| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: "child stdout unavailable".to_owned(), + })?; + let mut stdout = BufReader::new(stdout); + + write_mcp_frame( + &mut stdin, + &serde_json::json!({ + "jsonrpc": "2.0", + "id": "loomweave-init", + "method": "initialize", + "params": { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": { "name": "loomweave", "version": env!("CARGO_PKG_VERSION") } + } + }), + tool, + )?; + let _ = read_mcp_frame(&mut stdout, "loomweave-init", tool)?; + write_mcp_frame( + &mut stdin, + &serde_json::json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized", + "params": {} + }), + tool, + )?; + write_mcp_frame( + &mut stdin, + &serde_json::json!({ + "jsonrpc": "2.0", + "id": "loomweave-call", + "method": "tools/call", + "params": { "name": tool, "arguments": arguments } + }), + tool, + )?; + drop(stdin); + + let response = read_mcp_frame(&mut stdout, "loomweave-call", tool)?; + let _ = child.wait(); + if let Some(error) = response.get("error") { + return Err(WarplineClientError::McpTool { + tool: tool.to_owned(), + message: error.to_string(), + }); + } + let text = response + .get("result") + .and_then(|result| result.get("content")) + .and_then(serde_json::Value::as_array) + .and_then(|content| content.first()) + .and_then(|item| item.get("text")) + .and_then(serde_json::Value::as_str) + .ok_or_else(|| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("missing result.content[0].text in response {response}"), + })?; + let parsed: serde_json::Value = serde_json::from_str(text) + .map_err(|err| WarplineClientError::Contract(WarplineContractError::from(err)))?; + // A frozen `warpline.error.v1` body (or any `{ "error": … }`) is an + // honest "could not answer", surfaced so the caller degrades. + if let Some(error) = parsed.get("error") { + return Err(WarplineClientError::WarplineError { + tool: tool.to_owned(), + message: error.to_string(), + }); + } + Ok(parsed) + } +} + +impl WarplineLookup for WarplineMcpClient { + fn entity_churn_counts( + &self, + entity_refs: &[WarplineEntityRef], + window: Option<&serde_json::Value>, + ) -> Result { + let mut arguments = serde_json::json!({ + "entity_refs": entity_refs, + // Ask warpline to rank by count, descending — loomweave re-ranks its + // own scoped set from the returned counts regardless. + "sort_by": "churn_count", + "sort_order": "desc", + "actor": self.actor.clone(), + }); + if let (Some(window), Some(obj)) = (window, arguments.as_object_mut()) { + obj.insert("window".to_owned(), window.clone()); + } + // NOTE (known limitation): there is no per-call timeout. The subprocess + // round-trip is short-lived in practice, but a warpline child that + // accepts the connection and never responds would block this read. A + // `wait_timeout` wrapper is a tracked follow-up (matches the Filigree MCP + // path's current behaviour); a config knob is deliberately NOT advertised + // until it is honoured (input-affordances-are-promises). + let value = self.run_churn_tool(&arguments)?; + let body = value.to_string(); + parse_churn_count_response(&body).map_err(WarplineClientError::Contract) + } +} + +fn write_mcp_frame( + writer: &mut impl Write, + value: &serde_json::Value, + tool: &str, +) -> Result<(), WarplineClientError> { + let body = serde_json::to_vec(value).map_err(|err| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("serialize MCP request: {err}"), + })?; + write_frame(writer, &Frame { body }).map_err(|err| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("write MCP frame: {err}"), + }) +} + +fn read_mcp_frame( + reader: &mut impl std::io::BufRead, + expected_id: &str, + tool: &str, +) -> Result { + loop { + let frame = read_frame(reader, ContentLengthCeiling::DEFAULT).map_err(|err| { + WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("read MCP frame: {err}"), + } + })?; + let value: serde_json::Value = + serde_json::from_slice(&frame.body).map_err(|err| WarplineClientError::McpTool { + tool: tool.to_owned(), + message: format!("parse MCP response: {err}"), + })?; + if value + .get("id") + .and_then(serde_json::Value::as_str) + .is_some_and(|id| id == expected_id) + { + return Ok(value); + } + } +} + +/// Resolve the command that launches warpline's MCP stdio server. Env override +/// `LOOMWEAVE_WARPLINE_MCP_COMMAND` (with a `{project}` placeholder) wins; else +/// the `warpline mcp` shim. +fn resolve_warpline_mcp_command(project_root: Option<&Path>) -> (String, Vec) { + if let Ok(raw) = std::env::var("LOOMWEAVE_WARPLINE_MCP_COMMAND") { + let mut parts: Vec = raw + .split_whitespace() + .map(|part| match project_root { + Some(root) => part.replace("{project}", &root.display().to_string()), + None => part.to_owned(), + }) + .collect(); + if let Some(program) = parts.first().cloned() { + parts.remove(0); + return (program, parts); + } + } + ("warpline".to_owned(), vec!["mcp".to_owned()]) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// The recorded FROZEN `warpline.entity_churn_count.v1` envelope used as the + /// GV-LW-2 producer fixture: 3 refs, two observed (`churn_count >= 1`), one + /// never-observed (`churn_count: 0`, present, not omitted, not an error). + const GV_LW_2_FIXTURE: &str = r#"{ + "schema": "warpline.entity_churn_count.v1", + "ok": true, + "query": { + "repo": "/abs/path", + "tool": "warpline_entity_churn_count_get", + "arguments": {}, + "filters": {}, + "sort": {"by": "churn_count", "order": "desc"}, + "page": {"limit": 100, "cursor": null} + }, + "data": { + "items": [ + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000a", + "locator": "python:function:src/pkg/mod.py::alpha"}, + "churn_count": 7, + "first_changed_at": "2026-05-01T00:00:00Z", + "last_changed_at": "2026-06-13T00:00:00Z", + "last_actor": "agent:codex"}, + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000b", + "locator": "python:function:src/pkg/mod.py::beta"}, + "churn_count": 2, + "first_changed_at": "2026-05-10T00:00:00Z", + "last_changed_at": "2026-06-01T00:00:00Z", + "last_actor": "agent:fable"}, + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000c", + "locator": "python:function:src/pkg/mod.py::gamma"}, + "churn_count": 0, + "first_changed_at": null, + "last_changed_at": null, + "last_actor": null} + ], + "window": {"since": null, "until": null, "rev_range": null}, + "page": {"limit": 100, "next_cursor": null, "has_more": false} + }, + "warnings": [], + "next_actions": {}, + "enrichment": {"sei": "present"}, + "meta": {"producer": {"tool": "warpline", "version": "0.1.0"}, + "local_only": true, "peer_side_effects": []} + }"#; + + #[test] + fn parses_frozen_churn_envelope_shape() { + // GV-LW-2 producer side: parse the FULL frozen envelope through the real + // parse path and pin the contract — 3 items, two observed, one zero. + let parsed = parse_churn_count_response(GV_LW_2_FIXTURE).expect("frozen envelope parses"); + // The producer's `schema` matches the frozen contract URI we pin to. + assert_eq!(parsed.schema.as_deref(), Some(WARPLINE_CHURN_SCHEMA)); + assert_eq!(parsed.ok, Some(true)); + assert_eq!( + parsed.data.items.len(), + 3, + "all 3 refs echoed, none omitted" + ); + + let observed: Vec = parsed + .data + .items + .iter() + .filter(|i| i.churn_count >= 1) + .map(|i| i.churn_count) + .collect(); + assert_eq!( + observed.len(), + 2, + "two observed refs carry churn_count >= 1" + ); + + let gamma = parsed + .data + .items + .iter() + .find(|i| i.entity.locator.as_deref() == Some("python:function:src/pkg/mod.py::gamma")) + .expect("the never-observed ref is present, not omitted"); + assert_eq!( + gamma.churn_count, 0, + "never-observed ref is 0, not an error" + ); + } + + #[test] + fn indexes_counts_by_both_sei_and_locator() { + let parsed = parse_churn_count_response(GV_LW_2_FIXTURE).unwrap(); + let by_key = parsed.index_by_key(); + // Look up by SEI... + assert_eq!( + by_key + .get("loomweave:eid:0000000000000000000000000000000a") + .map(|i| i.churn_count), + Some(7) + ); + // ...and by locator. + assert_eq!( + by_key + .get("python:function:src/pkg/mod.py::beta") + .map(|i| i.churn_count), + Some(2) + ); + } + + #[test] + fn ref_keys_on_sei_then_falls_back_to_locator() { + let with_sei = + WarplineEntityRef::for_entity("python:function:m::f", Some("loomweave:eid:abc")); + assert_eq!(with_sei.kind, "sei"); + assert_eq!(with_sei.value, "loomweave:eid:abc"); + + let no_sei = WarplineEntityRef::for_entity("python:function:m::f", None); + assert_eq!(no_sei.kind, "locator"); + assert_eq!(no_sei.value, "python:function:m::f"); + + // A blank SEI is treated as absent — locator fallback, never an empty key. + let blank_sei = WarplineEntityRef::for_entity("python:function:m::f", Some(" ")); + assert_eq!(blank_sei.kind, "locator"); + assert_eq!(blank_sei.value, "python:function:m::f"); + } + + #[test] + fn default_lookup_reports_unavailable_not_empty() { + // The honest-degrade default: a `WarplineLookup` with no override does + // NOT return an empty ranking — it errors, so the caller degrades to + // honest-unavailable rather than reading absence as "no churn". + struct Bare; + impl WarplineLookup for Bare {} + let err = Bare.entity_churn_counts(&[], None).unwrap_err(); + assert!(matches!(err, WarplineClientError::McpTool { .. })); + } + + #[test] + fn disabled_config_yields_no_client() { + let config = WarplineConfig::default(); // enabled: false + assert!(WarplineMcpClient::from_config(&config, None).is_none()); + let enabled = WarplineConfig { + enabled: true, + ..WarplineConfig::default() + }; + assert!(WarplineMcpClient::from_config(&enabled, None).is_some()); + } +} diff --git a/crates/loomweave-mcp/src/catalogue/shortcuts.rs b/crates/loomweave-mcp/src/catalogue/shortcuts.rs index 39a93afa..be9f4275 100644 --- a/crates/loomweave-mcp/src/catalogue/shortcuts.rs +++ b/crates/loomweave-mcp/src/catalogue/shortcuts.rs @@ -16,13 +16,14 @@ use serde_json::{Value, json}; use loomweave_core::{EdgeConfidence, McpErrorCode}; use loomweave_storage::{ - call_edges_targeting, entities_by_churn, entities_targeted_by_unresolved_call_sites, - entity_by_id, resolve_entity_ref, + call_edges_targeting, entities_for_churn_candidates, + entities_targeted_by_unresolved_call_sites, entity_by_id, resolve_entity_ref, sei_for_locator, }; use crate::ParamError; use crate::ServerState; -use crate::catalogue::{Page, RawScope, finalize_entity_page, missing_signal}; +use crate::catalogue::{Page, RawScope, ScopeFilter, finalize_entity_page, missing_signal}; +use crate::warpline::{ChurnCountResponse, WarplineEntityRef}; use crate::{ entity_json, flatten_storage_envelope_result, optional_bool, optional_confidence, optional_non_empty_string, required_str, success_envelope, tool_error_envelope, @@ -800,101 +801,343 @@ impl ServerState { Ok(flatten_storage_envelope_result(result)) } - /// `high_churn(limit?, scope?)` — entities ranked by `git_churn_count` - /// descending. The analyze pipeline does not populate churn in v1.0, so this - /// is honest-empty in practice (the missing signal is surfaced); the query is - /// real, so it lights up if churn is ever populated. Bounded, SEI-carrying. + /// `high_churn(limit?, scope?)` — entities ranked by change count, **descending**. + /// + /// Loomweave does not populate `git_churn_count` in v1.0 and, by the seam's + /// HARD RULE, retains no cross-run history — so the count comes from Warpline + /// (the federation's temporal authority) at read time. Loomweave owns the + /// entity catalogue (the candidate universe); Warpline owns the counts. The + /// flow: gather candidates → filter to scope → one bounded, SEI-keyed + /// `warpline_entity_churn_count_get` call → rank the scoped set by the + /// returned counts → page → graft the count onto each entity. Read-only, + /// enrich-only, dependency-sink: nothing is stored, nothing flows from + /// loomweave to warpline. + /// + /// Honest-degrade (lock §1C): warpline absent / disabled / unreachable → + /// honest-empty with a missing-signal note that NAMES warpline as the source + /// — never empty-as-clean, never a hard error breaking the core flow. + /// Bounded, SEI-carrying. pub(crate) async fn tool_high_churn( &self, arguments: &serde_json::Map, + ) -> std::result::Result { + self.churn_ranked_surface(arguments, ChurnMode::HighChurn) + .await + } + + /// `recently_changed(since?, scope?)` — entities with a recorded change + /// (`churn_count >= 1`) over the window `[since, now)`, ordered by + /// `last_changed_at` descending. + /// + /// Same Warpline-backed read as [`Self::tool_high_churn`] — loomweave has no + /// per-entity git change timestamp in v1.0, so warpline supplies both the + /// count and the `last_changed_at`. `since` is passed through as the warpline + /// `window.since`; entities with no recorded change in the window are + /// filtered out. Honest-degrade is identical to `high_churn` (warpline absent + /// → honest-empty + warpline-named missing-signal note, never empty-as-clean). + pub(crate) async fn tool_recently_changed( + &self, + arguments: &serde_json::Map, + ) -> std::result::Result { + let since = match arguments.get("since") { + None | Some(Value::Null) => None, + Some(Value::String(value)) => Some(value.clone()), + Some(_) => return Err(ParamError::new("since must be an ISO-8601 string or null")), + }; + self.churn_ranked_surface(arguments, ChurnMode::RecentlyChanged { since }) + .await + } + + /// Shared body for the two Warpline-backed churn surfaces. `mode` selects + /// `high_churn` (rank all candidates by count desc) vs `recently_changed` + /// (window `since`, keep `churn_count >= 1`, order by `last_changed_at` desc). + async fn churn_ranked_surface( + &self, + arguments: &serde_json::Map, + mode: ChurnMode, ) -> std::result::Result { let scope = RawScope::parse(arguments)?; let page = Page::parse(arguments, SHORTCUT_PAGE_DEFAULT, SHORTCUT_PAGE_MAX)?; let project_root = self.project_root.clone(); + let warpline_client = self.warpline_client.clone(); let result = self .readers .with_reader(move |conn| { let filter = scope.resolve(conn)?; - let (rows, scan_truncated) = entities_by_churn(conn, CHURN_SCAN_CAP)?; - // Keep churn alongside; finalize over the entity rows, then graft - // the churn count onto each returned entity. - let churn_by_id: std::collections::HashMap = - rows.iter().map(|(e, c)| (e.id.clone(), *c)).collect(); - let entities: Vec<_> = rows.into_iter().map(|(e, _)| e).collect(); - let mut response = finalize_entity_page( + // 1. Candidate universe = the entity catalogue (capped), NOT the + // empty `git_churn_count` scan. Warpline holds the counts. + let (candidates, scan_truncated) = + entities_for_churn_candidates(conn, CHURN_SCAN_CAP)?; + // 2. Scope-filter BEFORE asking warpline, so a scoped query keeps + // its in-scope high-churners regardless of the global ranking + // (the representative-fixture trap: scope-after-rank loses + // in-scope rows that fell below the global top-N). + let in_scope: Vec = candidates + .into_iter() + .filter(|e| { + filter.contains(&e.id, e.source_file_path.as_deref(), &project_root) + }) + .collect(); + + // Honest-degrade: no warpline client wired → the surface cannot + // rank. Honest-empty with a warpline-named missing-signal note. + let Some(client) = warpline_client.as_ref() else { + return Ok(success_envelope(churn_unavailable( + &filter, + scan_truncated, + &mode, + "warpline-disabled", + "Warpline churn integration is disabled \ + (integrations.warpline.enabled: false); enable it to rank by change count", + ))); + }; + + // 3. Build SEI-keyed refs (locator fallback when unresolved) and + // make ONE bounded warpline call (lock: one aggregation read, + // join, discard — not N timeline calls). + let refs: Vec = in_scope + .iter() + .map(|e| { + let sei = sei_for_locator(conn, &e.id).ok().flatten(); + WarplineEntityRef::for_entity(&e.id, sei.as_deref()) + }) + .collect(); + let response = match client.entity_churn_counts(&refs, mode.window().as_ref()) { + Ok(response) => response, + Err(err) => { + // Transport / warpline-error / unparseable → honest + // unavailable, NOT an empty (clean) ranking. + return Ok(success_envelope(churn_unavailable( + &filter, + scan_truncated, + &mode, + "warpline-unreachable", + &err.to_string(), + ))); + } + }; + + // 4-5. Rank the scoped set by the returned counts, page, finalize, + // and graft the warpline facts back on. + Ok(success_envelope(rank_and_finalize_churn( conn, &project_root, - entities, + in_scope, &filter, page, scan_truncated, - ); - if let Some(list) = response["entities"].as_array() { - let grafted: Vec = list - .iter() - .map(|entity| { - let mut entity = entity.clone(); - if let Some(object) = entity.as_object_mut() - && let Some(id) = object.get("id").and_then(Value::as_str) - && let Some(churn) = churn_by_id.get(id) - { - object.insert("git_churn_count".to_owned(), json!(churn)); - } - entity - }) - .collect(); - if let Some(object) = response.as_object_mut() { - object.insert("entities".to_owned(), Value::Array(grafted)); - } - } - if response["page"]["total"] == json!(0) - && let Some(object) = response.as_object_mut() - { - object.insert( - "signal".to_owned(), - missing_signal( - "git_churn_count", - "no entity carries git churn; the analyze pipeline does not populate \ - git_churn_count in v1.0", - ), - ); - } - Ok(success_envelope(response)) + &mode, + &response, + ))) }) .await; Ok(flatten_storage_envelope_result(result)) } +} - /// `recently_changed(since?, scope?)` — entities changed since a timestamp. - /// Loomweave does not index a per-entity git change timestamp in v1.0, so this - /// is an honest no-op: it returns an empty set with a missing-signal note - /// pointing at `index_diff` for repo-level freshness. The args are accepted - /// for forward-compatibility. Never fabricates a change set. - // Honest no-op: no storage read, but kept `async` for the uniform tool - // dispatch interface (every `tool_*` is awaited in `handle_tool_call`). - #[allow(clippy::unused_async)] - pub(crate) async fn tool_recently_changed( - &self, - arguments: &serde_json::Map, - ) -> std::result::Result { - // Validate args so a malformed call still errors honestly. - let _ = RawScope::parse(arguments)?; - let since = match arguments.get("since") { - None | Some(Value::Null) => None, - Some(Value::String(value)) => Some(value.clone()), - Some(_) => return Err(ParamError::new("since must be an ISO-8601 string or null")), - }; - Ok(success_envelope(json!({ - "entities": [], - "since": since, - "page": { "total": 0, "offset": 0, "limit": 0, "returned": 0, "truncated": false }, - "signal": missing_signal( - "git_change_time", - "Loomweave does not index a per-entity git change timestamp in v1.0; use index_diff \ - for repo-level freshness (HEAD vs last analyze)" - ), - }))) +/// Which of the two Warpline-backed churn surfaces is being served. +#[derive(Debug, Clone)] +enum ChurnMode { + /// `high_churn`: rank every candidate by count (desc); keep count-0 rows. + HighChurn, + /// `recently_changed`: window by `since`, drop count-0 rows, order by + /// `last_changed_at` (desc). + RecentlyChanged { since: Option }, +} + +impl ChurnMode { + /// The frozen warpline `window` object for this mode (`None` for + /// `high_churn`: all-time count). + fn window(&self) -> Option { + match self { + ChurnMode::HighChurn => None, + ChurnMode::RecentlyChanged { since } => Some(json!({ + "since": since, "until": Value::Null, "rev_range": Value::Null + })), + } + } + + fn is_recency(&self) -> bool { + matches!(self, ChurnMode::RecentlyChanged { .. }) + } + + /// Tag the response with `churn_source` provenance and (for recency) echo the + /// `since` argument. Applied to every answer, available or degraded. + fn tag(&self, object: &mut serde_json::Map) { + object.insert("churn_source".to_owned(), json!("warpline")); + if let ChurnMode::RecentlyChanged { since } = self { + object.insert("since".to_owned(), json!(since)); + } + } +} + +/// Rank the scoped candidates by warpline's returned counts, page + finalize, +/// and graft the churn facts onto each returned entity. `finalize_entity_page` +/// preserves input order (and re-applies scope idempotently), so passing the +/// already-ranked rows yields a ranked page. +#[allow(clippy::too_many_arguments)] +fn rank_and_finalize_churn( + conn: &rusqlite::Connection, + project_root: &std::path::Path, + in_scope: Vec, + filter: &ScopeFilter, + page: Page, + scan_truncated: bool, + mode: &ChurnMode, + response: &ChurnCountResponse, +) -> Value { + // Index counts by key (SEI or locator), then rank the scoped set. + let by_key = response.index_by_key(); + let is_recency = mode.is_recency(); + let mut ranked: Vec<(loomweave_storage::EntityRow, ChurnFacts)> = in_scope + .into_iter() + .filter_map(|entity| { + let sei = sei_for_locator(conn, &entity.id).ok().flatten(); + let key_sei = sei.as_deref().filter(|s| !s.is_empty()); + // Look up the count under whichever key warpline echoed. + let item = key_sei + .and_then(|s| by_key.get(s)) + .or_else(|| by_key.get(entity.id.as_str())); + let facts = ChurnFacts::from_item(item.copied()); + // recently_changed keeps only entities with a recorded change; + // high_churn keeps every candidate (count 0 included). + if is_recency && facts.churn_count < 1 { + None + } else { + Some((entity, facts)) + } + }) + .collect(); + if is_recency { + // Most-recently-changed first. `last_changed_at` is ISO-8601, so lexical + // desc == chronological desc; tie-break on id for determinism. + ranked.sort_by(|a, b| { + b.1.last_changed_at + .cmp(&a.1.last_changed_at) + .then_with(|| a.0.id.cmp(&b.0.id)) + }); + } else { + ranked.sort_by(|a, b| { + b.1.churn_count + .cmp(&a.1.churn_count) + .then_with(|| a.0.id.cmp(&b.0.id)) + }); + } + + let facts_by_id: std::collections::HashMap = ranked + .iter() + .map(|(e, f)| (e.id.clone(), f.clone())) + .collect(); + let ranked_entities: Vec = + ranked.into_iter().map(|(e, _)| e).collect(); + let mut response_json = finalize_entity_page( + conn, + project_root, + ranked_entities, + filter, + page, + scan_truncated, + ); + if let Some(list) = response_json["entities"].as_array() { + let grafted: Vec = list + .iter() + .map(|entity| { + let mut entity = entity.clone(); + if let Some(object) = entity.as_object_mut() + && let Some(id) = object.get("id").and_then(Value::as_str) + && let Some(facts) = facts_by_id.get(id) + { + facts.graft_onto(object); + } + entity + }) + .collect(); + if let Some(object) = response_json.as_object_mut() { + object.insert("entities".to_owned(), Value::Array(grafted)); + } + } + // Provenance + honest-empty note. A genuinely empty answer (warpline present, + // no in-scope entity carried a recorded change) is honest-empty, never clean. + let is_empty = response_json["page"]["total"] == json!(0); + if let Some(object) = response_json.as_object_mut() { + mode.tag(object); + if is_empty { + object.insert( + "signal".to_owned(), + missing_signal( + "warpline_churn", + "warpline reported no recorded change for any in-scope entity \ + (a complete answer, not an absence of code)", + ), + ); + } + } + response_json +} + +/// Warpline churn facts grafted onto an entity in the churn surfaces. A +/// candidate with no warpline row (never-observed) carries `churn_count: 0` and +/// null timestamps — a real, complete answer, not a missing fact. +#[derive(Debug, Clone)] +struct ChurnFacts { + churn_count: i64, + first_changed_at: Option, + last_changed_at: Option, + last_actor: Option, +} + +impl ChurnFacts { + fn from_item(item: Option<&crate::warpline::ChurnItem>) -> Self { + match item { + Some(item) => Self { + churn_count: item.churn_count, + first_changed_at: item.first_changed_at.clone(), + last_changed_at: item.last_changed_at.clone(), + last_actor: item.last_actor.clone(), + }, + None => Self { + churn_count: 0, + first_changed_at: None, + last_changed_at: None, + last_actor: None, + }, + } + } + + fn graft_onto(&self, object: &mut serde_json::Map) { + object.insert("churn_count".to_owned(), json!(self.churn_count)); + object.insert("first_changed_at".to_owned(), json!(self.first_changed_at)); + object.insert("last_changed_at".to_owned(), json!(self.last_changed_at)); + object.insert("last_actor".to_owned(), json!(self.last_actor)); + } +} + +/// The honest-degrade envelope for the churn surfaces when warpline cannot +/// answer (disabled / unreachable / error). An empty entity set, a +/// missing-signal note that NAMES warpline + the reason, and `churn_source: +/// warpline` provenance. Never empty-as-clean, never a hard error. Mirrors the +/// `scope_truncated`/`scan_truncated`/`page` shape of a normal answer so the +/// wire shape is stable across the available/unavailable paths. +fn churn_unavailable( + filter: &ScopeFilter, + scan_truncated: bool, + mode: &ChurnMode, + reason: &str, + message: &str, +) -> Value { + let mut response = json!({ + "entities": [], + "page": { "total": 0, "offset": 0, "limit": 0, "returned": 0, "truncated": false }, + "scope_truncated": filter.scope_truncated(), + "scan_truncated": scan_truncated, + "signal": missing_signal("warpline_churn", message), + "reason": reason, + }); + if let Some(object) = response.as_object_mut() { + // `churn_source: warpline` provenance + (for recency) the echoed `since`. + mode.tag(object); } + response } /// Of the given entity ids, those carrying the `test` categorisation tag. diff --git a/crates/loomweave-mcp/src/lib.rs b/crates/loomweave-mcp/src/lib.rs index caf00cd8..812f33de 100644 --- a/crates/loomweave-mcp/src/lib.rs +++ b/crates/loomweave-mcp/src/lib.rs @@ -10,6 +10,7 @@ pub mod scan_results; pub mod snapshot; mod tools; pub mod wardline_reconcile; +pub mod warpline; use std::collections::{BTreeSet, HashMap}; use std::path::{Component, Path, PathBuf}; @@ -711,12 +712,12 @@ pub fn list_tools() -> Vec { }, ToolDefinition { name: "entity_high_churn_list", - description: "Entities ranked by git churn, optional `scope`. v1.0 does not populate churn, so this is honest-empty in practice. Bounded.", + description: "Entities ranked by change count (desc), optional `scope`. Counts (churn_count/last_changed_at/last_actor) from Warpline at read time; Warpline off/unreachable → honest-empty (warpline signal). Bounded.", input_schema: scope_page_schema(false), }, ToolDefinition { name: "entity_recent_change_list", - description: "Entities changed since `since`, optional `scope`. v1.0 indexes no per-entity change time, so this is an honest no-op with a note pointing at index_diff_get. Never fabricates.", + description: "Entities changed since `since` (optional), most-recent first, optional `scope`. Change facts from Warpline at read time; Warpline off/unreachable → honest-empty (warpline signal).", input_schema: scope_page_schema(true), }, ToolDefinition { @@ -1210,6 +1211,13 @@ pub struct ServerState { budget: Arc>, inferred_inflight: InferredInflight, filigree_client: Option>, + /// Read-only consumer of Warpline's FROZEN churn read, injected by `serve` + /// when the warpline integration is enabled. `None` is the honest-degrade + /// default: the churn surfaces stay honest-empty (with a missing-signal note + /// naming warpline) rather than fabricating a ranking. Enrich-only, + /// dependency-sink — loomweave reads warpline's churn count and retains + /// nothing (the loomweave↔warpline HARD RULE). + warpline_client: Option>, diagnostics: Option, tool_policy: McpToolPolicy, /// Supervised `loomweave analyze` runs launched via `analyze_start`. @@ -1241,6 +1249,7 @@ impl ServerState { budget: Arc::new(Mutex::new(BudgetLedger::default())), inferred_inflight: Arc::new(AsyncMutex::new(HashMap::new())), filigree_client: None, + warpline_client: None, diagnostics: None, tool_policy: McpToolPolicy::default(), analyze_runs: Arc::new(Mutex::new(HashMap::new())), @@ -1342,6 +1351,18 @@ impl ServerState { self } + /// Inject the read-only Warpline churn consumer used by the high-churn / + /// recently-changed surfaces. Absent → those surfaces degrade honestly + /// (honest-empty with a missing-signal note). Enrich-only, dependency-sink. + #[must_use] + pub fn with_warpline_client( + mut self, + client: Arc, + ) -> Self { + self.warpline_client = Some(client); + self + } + #[must_use] pub fn with_diagnostics(mut self, diagnostics: DiagnosticsContext) -> Self { self.diagnostics = Some(diagnostics); diff --git a/crates/loomweave-mcp/src/warpline.rs b/crates/loomweave-mcp/src/warpline.rs new file mode 100644 index 00000000..1d56004b --- /dev/null +++ b/crates/loomweave-mcp/src/warpline.rs @@ -0,0 +1 @@ +pub use loomweave_federation::warpline::*; diff --git a/crates/loomweave-mcp/tests/catalogue_tools.rs b/crates/loomweave-mcp/tests/catalogue_tools.rs index 748f2236..af25b650 100644 --- a/crates/loomweave-mcp/tests/catalogue_tools.rs +++ b/crates/loomweave-mcp/tests/catalogue_tools.rs @@ -1639,7 +1639,6 @@ async fn categorisation_shortcuts_are_honest_empty() { "find_tests", "find_deprecations", "find_todos", - "high_churn", ] { let env = call_tool(&state, tool, json!({})).await; assert_eq!(env["ok"], true, "{tool}: {env}"); @@ -1648,6 +1647,29 @@ async fn categorisation_shortcuts_are_honest_empty() { } } +#[tokio::test] +async fn high_churn_degrades_honestly_without_warpline() { + // `high_churn` now ranks by Warpline's change count at read time. With no + // warpline client wired (the default), it degrades to honest-empty with a + // warpline-named signal — never empty-as-clean, never a hard error. + let (project, db, conn) = open_project(); + insert_entity(&conn, "python:function:a", "function", "a.py", Some((1, 2))); + drop(conn); + let state = state_for(project.path(), &db); + let env = call_tool(&state, "high_churn", json!({})).await; + assert_eq!(env["ok"], true, "{env}"); + assert_eq!( + env["error"], + serde_json::Value::Null, + "no hard error: {env}" + ); + assert_eq!(env["result"]["page"]["total"], 0, "{env}"); + assert_eq!(env["result"]["signal"]["available"], false, "{env}"); + assert_eq!(env["result"]["signal"]["signal"], "warpline_churn", "{env}"); + assert_eq!(env["result"]["churn_source"], "warpline", "{env}"); + assert_eq!(env["result"]["reason"], "warpline-disabled", "{env}"); +} + #[tokio::test] async fn find_tests_lights_up_when_test_tag_is_present() { // The query is real: if a plugin ever emits the `test` tag, the tool returns @@ -1749,7 +1771,11 @@ async fn what_tests_this_returns_test_tagged_callers() { } #[tokio::test] -async fn recently_changed_is_honest_noop() { +async fn recently_changed_degrades_honestly_without_warpline() { + // `recently_changed` is no longer a no-op: it ranks by Warpline's change + // facts at read time. With no warpline client wired (the default), it + // degrades to honest-empty with a warpline-named signal (never the old + // `git_change_time` no-op note, never empty-as-clean, never a hard error). let (project, db, conn) = open_project(); insert_entity(&conn, "python:function:a", "function", "a.py", Some((1, 2))); drop(conn); @@ -1761,8 +1787,17 @@ async fn recently_changed_is_honest_noop() { ) .await; assert_eq!(env["ok"], true, "{env}"); + assert_eq!( + env["error"], + serde_json::Value::Null, + "no hard error: {env}" + ); assert_eq!(env["result"]["page"]["total"], 0); - assert_eq!(env["result"]["signal"]["signal"], "git_change_time"); + assert_eq!(env["result"]["signal"]["signal"], "warpline_churn", "{env}"); + assert_eq!(env["result"]["churn_source"], "warpline", "{env}"); + assert_eq!(env["result"]["reason"], "warpline-disabled", "{env}"); + // The `since` argument is echoed back even on the degrade path. + assert_eq!(env["result"]["since"], "2026-01-01T00:00:00Z", "{env}"); } #[tokio::test] diff --git a/crates/loomweave-mcp/tests/warpline_churn_consumer.rs b/crates/loomweave-mcp/tests/warpline_churn_consumer.rs new file mode 100644 index 00000000..28af6c8e --- /dev/null +++ b/crates/loomweave-mcp/tests/warpline_churn_consumer.rs @@ -0,0 +1,352 @@ +//! GV-LW-2 conformance + honest-degrade for the Warpline churn consumer. +//! +//! The dead `entity_high_churn_list` / `entity_recent_change_list` surfaces are +//! lit up by consuming Warpline's FROZEN `warpline_entity_churn_count_get` +//! (`warpline.entity_churn_count.v1`) at read time (2026-06-13 interface lock +//! §1A / GV-LW-2). These tests inject a FAKE `WarplineLookup` that parses the +//! frozen *envelope* fixture through the real parse path — they NEVER make a +//! live Warpline MCP call (a hub-rooted session would misroute it; the contract +//! shape is what conformance pins). +//! +//! GV-LW-2 (lock §1D): input 3 SEIs, one never-observed → `data.items` len 3, +//! the observed two carry `churn_count >= 1`, the unobserved carries +//! `churn_count: 0` (not omitted, not error). + +use std::sync::{Arc, Mutex}; + +use loomweave_mcp::ServerState; +use loomweave_mcp::warpline::{ + ChurnCountResponse, WarplineClientError, WarplineEntityRef, WarplineLookup, + parse_churn_count_response, +}; +use loomweave_storage::{ReaderPool, pragma, schema}; +use rusqlite::{Connection, params}; +use serde_json::{Value, json}; + +// The three SEIs of the GV-LW-2 vector. alpha + beta are observed by warpline; +// gamma is never-observed. +const SEI_ALPHA: &str = "loomweave:eid:0000000000000000000000000000000a"; +const SEI_BETA: &str = "loomweave:eid:0000000000000000000000000000000b"; +const SEI_GAMMA: &str = "loomweave:eid:0000000000000000000000000000000c"; + +const LOC_ALPHA: &str = "python:function:src/pkg/mod.py::alpha"; +const LOC_BETA: &str = "python:function:src/pkg/mod.py::beta"; +const LOC_GAMMA: &str = "python:function:src/pkg/mod.py::gamma"; + +/// The recorded FROZEN `warpline.entity_churn_count.v1` envelope — the GV-LW-2 +/// producer fixture (full envelope, not a convenient subset). alpha=7, beta=2, +/// gamma=0 (present, not omitted). +const GV_LW_2_ENVELOPE: &str = r#"{ + "schema": "warpline.entity_churn_count.v1", + "ok": true, + "query": {"repo": "/abs/path", "tool": "warpline_entity_churn_count_get", + "arguments": {}, "filters": {}, "sort": {"by": "churn_count", "order": "desc"}, + "page": {"limit": 100, "cursor": null}}, + "data": { + "items": [ + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000a", + "locator": "python:function:src/pkg/mod.py::alpha"}, + "churn_count": 7, "first_changed_at": "2026-05-01T00:00:00Z", + "last_changed_at": "2026-06-13T00:00:00Z", "last_actor": "agent:codex"}, + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000b", + "locator": "python:function:src/pkg/mod.py::beta"}, + "churn_count": 2, "first_changed_at": "2026-05-10T00:00:00Z", + "last_changed_at": "2026-06-01T00:00:00Z", "last_actor": "agent:fable"}, + {"entity": {"sei": "loomweave:eid:0000000000000000000000000000000c", + "locator": "python:function:src/pkg/mod.py::gamma"}, + "churn_count": 0, "first_changed_at": null, "last_changed_at": null, "last_actor": null} + ], + "window": {"since": null, "until": null, "rev_range": null}, + "page": {"limit": 100, "next_cursor": null, "has_more": false} + }, + "warnings": [], "next_actions": {}, + "enrichment": {"sei": "present"}, + "meta": {"producer": {"tool": "warpline", "version": "0.1.0"}, + "local_only": true, "peer_side_effects": []} +}"#; + +/// A fake warpline client that replays the recorded frozen envelope through the +/// REAL parse path (`parse_churn_count_response`) — exactly what the live MCP +/// client does after reading the subprocess response. Records the refs it was +/// asked about so the test can assert loomweave sent SEI-keyed refs. +#[derive(Default)] +struct FakeWarplineClient { + seen_refs: Mutex>, + seen_window: Mutex>, +} + +impl WarplineLookup for FakeWarplineClient { + fn entity_churn_counts( + &self, + entity_refs: &[WarplineEntityRef], + window: Option<&Value>, + ) -> Result { + *self.seen_refs.lock().unwrap() = entity_refs.to_vec(); + *self.seen_window.lock().unwrap() = window.cloned(); + // Parse the recorded frozen envelope through the production parse path. + parse_churn_count_response(GV_LW_2_ENVELOPE).map_err(WarplineClientError::Contract) + } +} + +/// A warpline client that always errors — models unreachable / a frozen +/// `warpline.error.v1` body. The consumer must degrade to honest-unavailable. +struct UnreachableWarplineClient; + +impl WarplineLookup for UnreachableWarplineClient { + fn entity_churn_counts( + &self, + _entity_refs: &[WarplineEntityRef], + _window: Option<&Value>, + ) -> Result { + Err(WarplineClientError::WarplineError { + tool: "warpline_entity_churn_count_get".to_owned(), + message: "peer_unavailable".to_owned(), + }) + } +} + +fn open_project() -> (tempfile::TempDir, std::path::PathBuf, Connection) { + let project = tempfile::tempdir().expect("temp project"); + let dir = project.path().join(".weft/loomweave"); + std::fs::create_dir_all(&dir).expect("create .weft/loomweave"); + let db_path = dir.join("loomweave.db"); + let mut conn = Connection::open(&db_path).expect("open sqlite"); + pragma::apply_write_pragmas(&conn).expect("write pragmas"); + schema::apply_migrations(&mut conn).expect("apply migrations"); + (project, db_path, conn) +} + +fn insert_entity(conn: &Connection, id: &str) { + conn.execute( + "INSERT INTO entities (id, plugin_id, kind, name, short_name, source_file_path, \ + properties, content_hash, created_at, updated_at) \ + VALUES (?1,'python','function',?1,?1,'src/pkg/mod.py','{}','hash', \ + '2026-01-01T00:00:00.000Z','2026-01-01T00:00:00.000Z')", + params![id], + ) + .expect("insert entity"); +} + +fn insert_alive_sei(conn: &Connection, sei: &str, locator: &str) { + // A run row is required by the sei_bindings.born_run_id/updated_run_id FKs. + conn.execute( + "INSERT OR IGNORE INTO runs (id, started_at, config, stats, status) \ + VALUES ('run-1','2026-01-01T00:00:00.000Z','{}','{}','completed')", + [], + ) + .expect("insert run"); + conn.execute( + "INSERT INTO sei_bindings (sei, current_locator, body_hash, signature, status, \ + born_run_id, updated_run_id, updated_at) \ + VALUES (?1, ?2, 'bh', NULL, 'alive', 'run-1', 'run-1', '2026-01-01T00:00:00.000Z')", + params![sei, locator], + ) + .expect("insert sei binding"); +} + +/// Seed the 3 GV-LW-2 entities, each bound to its SEI. +fn seed_three_entities(conn: &Connection) { + for (loc, sei) in [ + (LOC_ALPHA, SEI_ALPHA), + (LOC_BETA, SEI_BETA), + (LOC_GAMMA, SEI_GAMMA), + ] { + insert_entity(conn, loc); + insert_alive_sei(conn, sei, loc); + } +} + +fn state_with_warpline( + project_root: &std::path::Path, + db_path: &std::path::Path, + client: Arc, +) -> ServerState { + let pool = ReaderPool::open(db_path, 2).expect("reader pool"); + ServerState::new(project_root.to_path_buf(), pool).with_warpline_client(client) +} + +fn state_without_warpline( + project_root: &std::path::Path, + db_path: &std::path::Path, +) -> ServerState { + let pool = ReaderPool::open(db_path, 2).expect("reader pool"); + ServerState::new(project_root.to_path_buf(), pool) +} + +/// Call a tool and return the FULL success envelope +/// (`{ok, result, error, …}`). The tool payload lives under `["result"]`; +/// `["error"]` is `null` on success (a hard error sets it non-null). +async fn call_tool(state: &ServerState, name: &str, arguments: Value) -> Value { + let response = state + .handle_json_rpc(&json!({ + "jsonrpc": "2.0", "id": "t", "method": "tools/call", + "params": {"name": name, "arguments": arguments} + })) + .await + .expect("tools/call returns a response"); + let text = response["result"]["content"][0]["text"] + .as_str() + .expect("tool content text"); + serde_json::from_str(text).expect("tool envelope JSON") +} + +/// The tool succeeded (did not hard-error) — assert the envelope and return the +/// inner payload (`result`). +fn ok_payload(envelope: &Value) -> &Value { + assert_eq!( + envelope["error"], + Value::Null, + "tool hard-errored — the core flow must not break: {envelope}" + ); + &envelope["result"] +} + +/// GV-LW-2: `high_churn` over the 3-entity vector → all 3 echoed, two `>= 1`, the +/// never-observed one `churn_count: 0` (present, not omitted, not error), +/// ranked by count descending, count grafted onto each entity. +#[tokio::test] +async fn gv_lw_2_high_churn_ranks_three_entities() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let fake = Arc::new(FakeWarplineClient::default()); + let state = state_with_warpline(project.path(), &db, fake.clone()); + + let envelope = call_tool(&state, "entity_high_churn_list", json!({})).await; + let result = ok_payload(&envelope); + let entities = result["entities"].as_array().expect("entities array"); + + // All 3 refs are present — none omitted (the gamma=0 invariant). + assert_eq!(entities.len(), 3, "all 3 candidates ranked, none omitted"); + assert_eq!(result["page"]["total"], json!(3)); + assert_eq!(result["churn_source"], json!("warpline")); + + // Ranked by count descending: alpha(7), beta(2), gamma(0). + assert_eq!(entities[0]["id"], json!(LOC_ALPHA)); + assert_eq!(entities[0]["churn_count"], json!(7)); + assert_eq!(entities[1]["id"], json!(LOC_BETA)); + assert_eq!(entities[1]["churn_count"], json!(2)); + assert_eq!(entities[2]["id"], json!(LOC_GAMMA)); + assert_eq!( + entities[2]["churn_count"], + json!(0), + "the never-observed entity is present with churn_count 0, not omitted, not an error" + ); + + // Two observed entities carry churn_count >= 1. + let observed = entities + .iter() + .filter(|e| e["churn_count"].as_i64().unwrap_or(0) >= 1) + .count(); + assert_eq!(observed, 2); + + // The recency fields are grafted from the frozen envelope. + assert_eq!( + entities[0]["last_changed_at"], + json!("2026-06-13T00:00:00Z") + ); + assert_eq!(entities[0]["last_actor"], json!("agent:codex")); + + // Loomweave sent SEI-keyed refs (one per candidate) — the keying contract. + let refs = fake.seen_refs.lock().unwrap(); + assert_eq!(refs.len(), 3, "one ref per candidate, one bounded call"); + assert!( + refs.iter().all(|r| r.kind == "sei"), + "every candidate had a resolved SEI, so every ref is SEI-keyed" + ); + let values: Vec<&str> = refs.iter().map(|r| r.value.as_str()).collect(); + assert!( + values.contains(&SEI_ALPHA) && values.contains(&SEI_BETA) && values.contains(&SEI_GAMMA) + ); +} + +/// `recently_changed` over the same vector → only the entities with a recorded +/// change (`churn_count >= 1`) survive, ordered by `last_changed_at` desc; the +/// `since` window is forwarded to warpline. +#[tokio::test] +async fn recently_changed_filters_unobserved_and_orders_by_last_change() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let fake = Arc::new(FakeWarplineClient::default()); + let state = state_with_warpline(project.path(), &db, fake.clone()); + + let envelope = call_tool( + &state, + "entity_recent_change_list", + json!({ "since": "2026-05-01T00:00:00Z" }), + ) + .await; + let result = ok_payload(&envelope); + let entities = result["entities"].as_array().expect("entities array"); + + // gamma (churn_count 0) is filtered out; alpha + beta remain. + assert_eq!( + entities.len(), + 2, + "only entities with a recorded change remain" + ); + // Ordered by last_changed_at desc: alpha (06-13) before beta (06-01). + assert_eq!(entities[0]["id"], json!(LOC_ALPHA)); + assert_eq!(entities[1]["id"], json!(LOC_BETA)); + assert_eq!(result["since"], json!("2026-05-01T00:00:00Z")); + assert_eq!(result["churn_source"], json!("warpline")); + + // The `since` was forwarded into warpline's window. + let window = fake.seen_window.lock().unwrap(); + assert_eq!( + window.as_ref().and_then(|w| w.get("since")), + Some(&json!("2026-05-01T00:00:00Z")) + ); +} + +/// Honest-degrade — NO warpline client wired (disabled): the surface returns +/// honest-unavailable with a warpline-named reason, NOT empty-as-clean, and does +/// NOT hard-error. (lock §1C, ENRICH-ONLY invariant.) +#[tokio::test] +async fn high_churn_degrades_honestly_when_warpline_absent() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let state = state_without_warpline(project.path(), &db); + + let envelope = call_tool(&state, "entity_high_churn_list", json!({})).await; + // Not a hard error — the tool answered. + let result = ok_payload(&envelope); + // Empty, but explicitly NOT clean: a warpline-named signal carries the reason. + assert_eq!(result["entities"].as_array().map(Vec::len), Some(0)); + assert_eq!(result["page"]["total"], json!(0)); + assert_eq!(result["churn_source"], json!("warpline")); + assert_eq!(result["reason"], json!("warpline-disabled")); + assert_eq!(result["signal"]["available"], json!(false)); + assert_eq!(result["signal"]["signal"], json!("warpline_churn")); + assert!( + result["signal"]["reason"] + .as_str() + .unwrap_or_default() + .to_lowercase() + .contains("warpline"), + "the missing-signal note must name warpline as the source" + ); +} + +/// Honest-degrade — warpline wired but unreachable / returns an error envelope: +/// same honest-unavailable shape (`warpline-unreachable`), never empty-as-clean, +/// never a hard error. Distinguishes "warpline could not answer" from "warpline +/// answered with genuine zeros". +#[tokio::test] +async fn high_churn_degrades_honestly_when_warpline_unreachable() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let state = state_with_warpline(project.path(), &db, Arc::new(UnreachableWarplineClient)); + + let envelope = call_tool(&state, "entity_high_churn_list", json!({})).await; + let result = ok_payload(&envelope); + assert_eq!(result["entities"].as_array().map(Vec::len), Some(0)); + assert_eq!(result["reason"], json!("warpline-unreachable")); + assert_eq!(result["churn_source"], json!("warpline")); + assert!( + result["signal"]["reason"] + .as_str() + .unwrap_or_default() + .contains("peer_unavailable"), + "the warpline error reason is surfaced, not swallowed" + ); +} diff --git a/crates/loomweave-storage/src/lib.rs b/crates/loomweave-storage/src/lib.rs index 9300a1f5..caa31204 100644 --- a/crates/loomweave-storage/src/lib.rs +++ b/crates/loomweave-storage/src/lib.rs @@ -59,10 +59,10 @@ pub use query::{ candidate_entities_for_unresolved_sites, child_entity_ids, contained_entity_ids, containing_module_id, current_file_hash, duplicate_locator_collision, edge_total, entities_by_churn, entities_by_kind, entities_by_tag, entities_containing_line, - entities_targeted_by_unresolved_call_sites, entities_with_wardline_facts, entity_at_line, - entity_briefing_block_reason, entity_by_id, entity_ids_in_namespace, entity_total, - entity_visibility, existing_entity_ids, find_entities, findings_for_emit, - import_edges_for_entity, known_entity_kinds, known_entity_tags, + entities_for_churn_candidates, entities_targeted_by_unresolved_call_sites, + entities_with_wardline_facts, entity_at_line, entity_briefing_block_reason, entity_by_id, + entity_ids_in_namespace, entity_total, entity_visibility, existing_entity_ids, find_entities, + findings_for_emit, import_edges_for_entity, known_entity_kinds, known_entity_tags, live_unresolved_call_sites_exist, module_dependency_edges, module_reference_rollup, normalize_source_path, preferred_finding_anchor_by_file, reference_edges_for_entity, relation_edges_for_entity, resolve_entity_ref, resolve_file, resolve_file_catalog_entry, diff --git a/crates/loomweave-storage/src/query.rs b/crates/loomweave-storage/src/query.rs index 6e6af97a..fb537765 100644 --- a/crates/loomweave-storage/src/query.rs +++ b/crates/loomweave-storage/src/query.rs @@ -1319,6 +1319,25 @@ pub fn entities_by_churn( Ok((out, truncated)) } +/// Candidate universe for the warpline-backed churn surfaces +/// (`entity_high_churn_list` / `entity_recent_change_list`). Unlike +/// [`entities_by_churn`], this does NOT filter on `git_churn_count` (loomweave +/// does not populate it in v1.0, so that scan is empty). Loomweave owns the +/// entity *catalogue*; warpline owns the change *counts*. The read layer sends +/// these rows' SEIs/locators to warpline and ranks by the returned counts. Rows +/// are ordered by id (a stable, content-free order — warpline supplies the +/// ranking) and materialised up to `scan_cap`. Returns `(rows, scan_truncated)`. +pub fn entities_for_churn_candidates( + conn: &Connection, + scan_cap: usize, +) -> Result<(Vec, bool)> { + let limit = i64::try_from(scan_cap.saturating_add(1)).unwrap_or(i64::MAX); + let sql = format!("SELECT {ENTITY_COLUMNS} FROM entities ORDER BY id LIMIT ?1"); + let mut stmt = conn.prepare(&sql)?; + let rows = stmt.query_map(params![limit], map_entity_row)?; + collect_capped(rows, scan_cap) +} + pub fn call_edges_targeting( conn: &Connection, target_id: &str, From 30549a3af175d8f4d8ffa043a44d7de0403a40a4 Mon Sep 17 00:00:00 2001 From: John Morrissey <544926+tachyon-beep@users.noreply.github.com> Date: Sun, 28 Jun 2026 17:23:22 +1000 Subject: [PATCH 2/2] fix(churn): speak warpline-mcp's newline JSON-RPC + honest paging/keying disclosure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Warpline churn consumer was a live NO-GO: entity_high_churn_list / entity_recent_change_list hung when integrations.warpline.enabled=true. Two transport bugs, both confirmed against warpline's source and live on lacuna: 1. Framing mismatch. The client drove warpline over Loomweave's Content-Length plugin framing, but warpline-mcp reads NEWLINE-delimited JSON-RPC (`for line in sys.stdin`, one response line per request line) — so warpline never parsed the request and the read blocked forever. 2. Wrong launcher (found only live). The default command was `warpline mcp`, which is not a valid warpline subcommand (usage error -> broken pipe). The MCP stdio server ships as the standalone `warpline-mcp` binary. Transport rewrite (loomweave-federation/src/warpline.rs): - Newline-delimited JSON-RPC; the whole handshake+call runs on a worker thread bounded by recv_timeout + kill-on-timeout, so a hung warpline degrades to the honest `warpline-unreachable` response instead of hanging (req #5). stderr is discarded (Stdio::null) so a large traceback can't block the child. - Default launcher resolves to `warpline-mcp` (not `warpline mcp`). - Send required `repo: ` (req #2); drop the unsupported `actor` param (req #3) — it is not in warpline's frozen churn schema. Parse result.structuredContent, falling back to content[0].text (req #4). - Config: keep WarplineConfig.actor (reserved, for deny_unknown_fields back-compat with configs that set it) and add an honored timeout_seconds (default 10), retiring the stale "no timeout knob" doc. Honesty floor (two zeros must not be conflated with a never-observed 0): - Send limit = refs.len() so warpline's default 100-item page does not silently cap the join at the top-100-by-churn. - churn_truncated + ChurnCountResponse::overflow_partial(): warpline keeps a 200-item in-band lead and spills the rest (apply_overflow), so an over-cap scope's truncated tail is DISCLOSED, not shipped as fabricated zeros. - churn_unresolved + unresolved_ref_count(): warpline returns locator:null for an unresolved SEI ref (a resolve hit always sets the NOT-NULL locator), so a keying-miss 0 ("real churn UNKNOWN") is disclosed rather than read as "this code never changes". Closes the disclosure asymmetry with churn_truncated. Disabled / unreachable / empty degrade paths (warpline-disabled, warpline-unreachable, churn_source:"warpline", signal) are preserved (req #6). Validated live on /home/john/lacuna (release + uv-installed binary): enabled -> real nonzero ranked churn with churn_truncated{counted:200,total:580}; recent_change -> real recency, no hang; scoped -> churn_unresolved{count:54}; disabled -> honest-empty. 564 federation+mcp tests pass; fmt + workspace clippy + cargo doc all clean. Follow-ups (filed as filigree observations, not addressed here): deep-pagination via warpline's overflow dump for >200-candidate scopes; the loomweave<->warpline locator-dialect + NULL-sei keying gap that produces the unresolved refs; and the same Content-Length-vs-newline framing latent in filigree.rs. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/loomweave-federation/src/config.rs | 26 +- crates/loomweave-federation/src/warpline.rs | 679 ++++++++++++++---- .../loomweave-mcp/src/catalogue/shortcuts.rs | 52 ++ .../tests/warpline_churn_consumer.rs | 141 ++++ 4 files changed, 758 insertions(+), 140 deletions(-) diff --git a/crates/loomweave-federation/src/config.rs b/crates/loomweave-federation/src/config.rs index 92917e50..e554203a 100644 --- a/crates/loomweave-federation/src/config.rs +++ b/crates/loomweave-federation/src/config.rs @@ -777,18 +777,21 @@ pub struct WarplineConfig { /// until an operator opts in. A missing/unreachable warpline with this /// `true` degrades the same way — never an error, never empty-as-clean. pub enabled: bool, - /// Actor identity carried on the warpline call. Warpline is an MCP-stdio - /// member (no HTTP read API), so it is launched as a subprocess and driven - /// over its MCP stdio transport — the same mechanism the Filigree MCP-tool - /// calls use. The command is resolved at call time (env override - /// `LOOMWEAVE_WARPLINE_MCP_COMMAND`, else the `warpline mcp` shim). - /// - /// There is deliberately NO `timeout_seconds` knob: warpline has no HTTP - /// path, the subprocess round-trip is short-lived, and a per-call timeout is - /// not yet wired — advertising one would promise a guarantee not delivered - /// (input-affordances-are-promises). Subprocess-hang handling is a tracked - /// follow-up. + /// Operator-configured actor identity. **Reserved, not sent on the wire:** + /// `actor` is not in warpline's FROZEN `warpline_entity_churn_count_get` + /// schema (`additionalProperties: false`), so the churn read carries none. + /// The field is retained (rather than removed) so an existing `loomweave.yaml` + /// that sets `integrations.warpline.actor` still parses under + /// `deny_unknown_fields` — warpline's own dogfood config sets it. pub actor: String, + /// Per-call timeout (seconds) for the warpline churn subprocess round-trip. + /// Warpline is an MCP-stdio member (no HTTP read API), launched as a + /// subprocess and driven over **newline-delimited** MCP JSON-RPC (the + /// transport `warpline-mcp` actually speaks). A warpline child that accepts + /// the connection and never answers would otherwise hang the read; this + /// bound makes a transport fault degrade to the honest `warpline-unreachable` + /// response instead. Default 10s; a `0` is floored to 1s by the client. + pub timeout_seconds: u64, } impl Default for WarplineConfig { @@ -796,6 +799,7 @@ impl Default for WarplineConfig { Self { enabled: false, actor: "loomweave-mcp".to_owned(), + timeout_seconds: 10, } } } diff --git a/crates/loomweave-federation/src/warpline.rs b/crates/loomweave-federation/src/warpline.rs index 6da77591..19d473b1 100644 --- a/crates/loomweave-federation/src/warpline.rs +++ b/crates/loomweave-federation/src/warpline.rs @@ -22,16 +22,23 @@ //! complete answer, not an error (lock §1A "Keying"). //! //! Transport: Warpline is an MCP-stdio member (no HTTP read API), so it is -//! launched as a subprocess and driven over MCP stdio — the same mechanism the -//! Filigree MCP-tool calls use (`filigree::run_mcp_tool`). Kept self-contained -//! here rather than sharing filigree's private frame helpers. +//! launched as a subprocess and driven over MCP stdio. Unlike Loomweave's own +//! language plugins (Content-Length framed, ADR-002), `warpline-mcp` speaks +//! **newline-delimited** JSON-RPC — one compact JSON object per line, one +//! response line per request line. This client frames to match (an earlier +//! Content-Length copy of the Filigree path hung the read against warpline's +//! line transport). The whole exchange runs in a worker thread bounded by a +//! per-call timeout: a warpline child that accepts the connection and never +//! answers is killed, and the surface degrades to `warpline-unreachable` rather +//! than hanging. use std::collections::HashMap; -use std::io::{BufReader, Write}; +use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::sync::mpsc::RecvTimeoutError; +use std::time::Duration; -use loomweave_core::plugin::{ContentLengthCeiling, Frame, read_frame, write_frame}; use serde::Deserialize; use thiserror::Error; @@ -101,11 +108,34 @@ pub struct ChurnEntity { } /// The `data` payload of the frozen churn envelope (`data.items` is the part -/// loomweave joins on). +/// loomweave joins on; `data.overflow` discloses truncation). #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct ChurnData { #[serde(default)] pub items: Vec, + /// Warpline's overflow carrier — present when the read was bounded. + #[serde(default)] + pub overflow: Option, +} + +/// The `data.overflow` carrier from the frozen envelope. Warpline bounds an +/// oversized churn read: it keeps a lead window in-band (`returned` of `total`) +/// and spills the FULL list to `dumped_to`, reporting `reason_class: "partial"` +/// (else `"clean"`). Loomweave reads `reason_class` / `total` / `returned` to +/// DISCLOSE that a ranking is partial — so a truncated-out entity's +/// `churn_count: 0` (warpline *has* a record) is never conflated with a genuine +/// never-observed `0`. Reading `dumped_to` for complete coverage of a scope +/// larger than warpline's in-band cap is a tracked follow-up (deep-pagination). +#[derive(Debug, Clone, PartialEq, Default, Deserialize)] +pub struct ChurnOverflow { + #[serde(default)] + pub reason_class: Option, + #[serde(default)] + pub total: Option, + #[serde(default)] + pub returned: Option, + #[serde(default)] + pub dumped_to: Option, } /// The full FROZEN success envelope warpline returns @@ -139,6 +169,46 @@ impl ChurnCountResponse { } by_key } + + /// When warpline truncated the churn read to an in-band lead (overflow + /// `reason_class: "partial"`), the `(total, counted)` pair: `total` refs were + /// requested but only `counted` carry real counts in-band — the rest are + /// absent from `data.items`, so a join reads them as `0`. `None` when the + /// answer is complete (`clean` / no overflow), in which case every `0` is a + /// genuine never-observed count. The caller discloses the partial case so the + /// two kinds of `0` are not conflated. + #[must_use] + pub fn overflow_partial(&self) -> Option<(i64, i64)> { + let overflow = self.data.overflow.as_ref()?; + if overflow.reason_class.as_deref() != Some("partial") { + return None; + } + let items_len = i64::try_from(self.data.items.len()).unwrap_or(i64::MAX); + let counted = overflow.returned.unwrap_or(items_len); + let total = overflow.total.unwrap_or(counted); + Some((total, counted)) + } + + /// Count of returned items that are KEYING MISSES — refs warpline could not + /// resolve, returned with `churn_count: 0`. Distinguishable from a genuine + /// never-observed `0`: warpline echoes a non-null `locator` for a resolved + /// entity (the `entity_keys.locator` column is NOT NULL), but `locator: null` + /// for an unresolved **SEI** ref (producer `commands.py`: a resolve miss on a + /// sei-kind ref sets `{sei: , locator: null}`). So `locator.is_none()` + /// flags a ref whose `0` means "warpline has no key for this entity" (its real + /// churn is unknown, not zero) — the loomweave↔warpline keying/dialect gap. + /// + /// Caveat: only catches SEI-keyed misses. A *locator*-kind miss echoes the + /// sent value back as `locator`, so it is indistinguishable from a genuine + /// never-observed `0` here — the caller's disclosure is bounded accordingly. + #[must_use] + pub fn unresolved_ref_count(&self) -> usize { + self.data + .items + .iter() + .filter(|item| item.entity.locator.is_none()) + .count() + } } /// Parse the FROZEN churn envelope body. Pins the wire contract: a body that is @@ -204,8 +274,16 @@ pub trait WarplineLookup: Send + Sync { /// `config.enabled`; an absent client (`None`) is the honest-degrade default. #[derive(Debug, Clone)] pub struct WarplineMcpClient { - actor: String, + /// The resolved launch command (program, args) for `warpline-mcp`. Resolved + /// once at construction from the env override / `warpline mcp` shim so the + /// transport path stays env-free and unit-testable (a test injects a fake + /// newline-MCP server here directly). + command: (String, Vec), + /// The repo root sent as the required `repo` argument and used as the + /// subprocess working directory. project_root: Option, + /// Per-call round-trip bound; a hung warpline is killed at this deadline. + timeout: Duration, } impl WarplineMcpClient { @@ -218,8 +296,11 @@ impl WarplineMcpClient { return None; } Some(Self { - actor: config.actor.clone(), + command: resolve_warpline_mcp_command(project_root), project_root: project_root.map(Path::to_path_buf), + // Floor a degenerate `0` to 1s so the knob can never mean "never + // wait" (which would make every call instantly time out). + timeout: Duration::from_secs(config.timeout_seconds.max(1)), }) } @@ -228,104 +309,108 @@ impl WarplineMcpClient { arguments: &serde_json::Value, ) -> Result { let tool = WARPLINE_CHURN_TOOL; - let (program, args) = resolve_warpline_mcp_command(self.project_root.as_deref()); - let mut child = Command::new(&program) - .args(&args) + let (program, args) = &self.command; + let mut child = Command::new(program) + .args(args) + // stderr is deliberately discarded: this client never drains it, so + // a large warpline traceback that filled a piped stderr (64 KiB) + // would block warpline mid-write. Diagnostics surface through the + // honest-degrade reason, not warpline's stderr. .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) + .stderr(Stdio::null()) .current_dir( self.project_root .as_deref() .unwrap_or_else(|| Path::new(".")), ) .spawn() - .map_err(|err| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("spawn {program}: {err}"), - })?; + .map_err(|err| mcp_tool_error(tool, &format!("spawn {program}: {err}")))?; let mut stdin = child .stdin .take() - .ok_or_else(|| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: "child stdin unavailable".to_owned(), - })?; + .ok_or_else(|| mcp_tool_error(tool, "child stdin unavailable"))?; let stdout = child .stdout .take() - .ok_or_else(|| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: "child stdout unavailable".to_owned(), - })?; - let mut stdout = BufReader::new(stdout); - - write_mcp_frame( - &mut stdin, - &serde_json::json!({ - "jsonrpc": "2.0", - "id": "loomweave-init", - "method": "initialize", - "params": { - "protocolVersion": "2025-11-25", - "capabilities": {}, - "clientInfo": { "name": "loomweave", "version": env!("CARGO_PKG_VERSION") } - } - }), - tool, - )?; - let _ = read_mcp_frame(&mut stdout, "loomweave-init", tool)?; - write_mcp_frame( - &mut stdin, - &serde_json::json!({ - "jsonrpc": "2.0", - "method": "notifications/initialized", - "params": {} - }), - tool, - )?; - write_mcp_frame( - &mut stdin, - &serde_json::json!({ - "jsonrpc": "2.0", - "id": "loomweave-call", - "method": "tools/call", - "params": { "name": tool, "arguments": arguments } - }), - tool, - )?; - drop(stdin); - - let response = read_mcp_frame(&mut stdout, "loomweave-call", tool)?; - let _ = child.wait(); - if let Some(error) = response.get("error") { - return Err(WarplineClientError::McpTool { - tool: tool.to_owned(), - message: error.to_string(), - }); - } - let text = response - .get("result") - .and_then(|result| result.get("content")) - .and_then(serde_json::Value::as_array) - .and_then(|content| content.first()) - .and_then(|item| item.get("text")) - .and_then(serde_json::Value::as_str) - .ok_or_else(|| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("missing result.content[0].text in response {response}"), - })?; - let parsed: serde_json::Value = serde_json::from_str(text) - .map_err(|err| WarplineClientError::Contract(WarplineContractError::from(err)))?; - // A frozen `warpline.error.v1` body (or any `{ "error": … }`) is an - // honest "could not answer", surfaced so the caller degrades. - if let Some(error) = parsed.get("error") { - return Err(WarplineClientError::WarplineError { - tool: tool.to_owned(), - message: error.to_string(), - }); + .ok_or_else(|| mcp_tool_error(tool, "child stdout unavailable"))?; + + // MCP handshake + the one churn call. `warpline-mcp` is a stateless + // per-line dispatcher (it does not require the handshake), but sending + // it keeps us a correct MCP client; the reader skips the init result and + // the notification's spurious `id: null` error by id. + let init = serde_json::json!({ + "jsonrpc": "2.0", + "id": "loomweave-init", + "method": "initialize", + "params": { + // A protocol version warpline advertises (2024-11-05 / 2025-03-26); + // warpline negotiates down rather than rejecting, but match anyway. + "protocolVersion": "2025-03-26", + "capabilities": {}, + "clientInfo": { "name": "loomweave", "version": env!("CARGO_PKG_VERSION") } + } + }); + let initialized = serde_json::json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized", + "params": {} + }); + let call = serde_json::json!({ + "jsonrpc": "2.0", + "id": "loomweave-call", + "method": "tools/call", + "params": { "name": tool, "arguments": arguments } + }); + + // Drive the exchange on a worker thread so the blocking write+read is + // bounded by `recv_timeout`. On timeout we kill the child, which closes + // its pipes and unblocks the worker. The handshake responses are tiny + // (well under a pipe buffer), so warpline never blocks on stdout-write + // while we are still writing stdin — no write/read deadlock. + let (tx, rx) = std::sync::mpsc::channel(); + let worker = std::thread::spawn(move || { + let outcome = (|| -> Result { + write_json_line(&mut stdin, &init, tool)?; + write_json_line(&mut stdin, &initialized, tool)?; + write_json_line(&mut stdin, &call, tool)?; + stdin + .flush() + .map_err(|err| mcp_tool_error(tool, &format!("flush warpline stdin: {err}")))?; + // EOF on stdin ends warpline's read loop so it exits cleanly. + drop(stdin); + let mut reader = BufReader::new(stdout); + read_response_for_id(&mut reader, "loomweave-call", tool) + })(); + let _ = tx.send(outcome); + }); + + match rx.recv_timeout(self.timeout) { + Ok(outcome) => { + let _ = child.wait(); + let _ = worker.join(); + envelope_from_response(&outcome?, tool) + } + Err(RecvTimeoutError::Timeout) => { + let _ = child.kill(); + let _ = child.wait(); + let _ = worker.join(); + Err(mcp_tool_error( + tool, + &format!( + "warpline did not respond within {}s", + self.timeout.as_secs() + ), + )) + } + Err(RecvTimeoutError::Disconnected) => { + let _ = child.wait(); + Err(mcp_tool_error( + tool, + "warpline worker thread disconnected before responding", + )) + } } - Ok(parsed) } } @@ -341,68 +426,144 @@ impl WarplineLookup for WarplineMcpClient { // own scoped set from the returned counts regardless. "sort_by": "churn_count", "sort_order": "desc", - "actor": self.actor.clone(), }); - if let (Some(window), Some(obj)) = (window, arguments.as_object_mut()) { - obj.insert("window".to_owned(), window.clone()); + if let Some(obj) = arguments.as_object_mut() { + // `repo` is REQUIRED by warpline (`_repo_arg`); warpline keys its + // store by it, so it must match the repo warpline indexed (the + // subprocess working dir). Omitting it made every call error; + // sending the wrong path silently resolves every ref to 0. + if let Some(root) = self.project_root.as_deref() { + obj.insert( + "repo".to_owned(), + serde_json::json!(root.display().to_string()), + ); + } + // Page the whole ref set in-band: warpline defaults `limit` to 100, + // so without this it would echo counts for only the top 100 refs by + // churn and the join would read every other candidate as `0`. `.max(1)` + // dodges warpline's `limit <= 0` rejection on an empty ref set. This + // covers the page cap; warpline's separate overflow cap (in-band lead) + // still bounds very large scopes — disclosed via `overflow_partial`. + obj.insert( + "limit".to_owned(), + serde_json::json!(entity_refs.len().max(1)), + ); + if let Some(window) = window { + obj.insert("window".to_owned(), window.clone()); + } } - // NOTE (known limitation): there is no per-call timeout. The subprocess - // round-trip is short-lived in practice, but a warpline child that - // accepts the connection and never responds would block this read. A - // `wait_timeout` wrapper is a tracked follow-up (matches the Filigree MCP - // path's current behaviour); a config knob is deliberately NOT advertised - // until it is honoured (input-affordances-are-promises). + // `actor` is deliberately NOT sent: it is not in warpline's frozen churn + // schema (its `inputSchema` is `additionalProperties: false`), so we omit + // it. (Warpline does not enforce that at runtime — it ignores unknown + // params — but the contract is what we conform to.) let value = self.run_churn_tool(&arguments)?; let body = value.to_string(); parse_churn_count_response(&body).map_err(WarplineClientError::Contract) } } -fn write_mcp_frame( +/// Build a transport-level [`WarplineClientError::McpTool`] (every variant the +/// caller treats as honest-unavailable). +fn mcp_tool_error(tool: &str, message: &str) -> WarplineClientError { + WarplineClientError::McpTool { + tool: tool.to_owned(), + message: message.to_owned(), + } +} + +/// Write one newline-delimited JSON-RPC message: compact JSON (no embedded +/// newlines) followed by `\n`, the framing `warpline-mcp` reads line-by-line. +fn write_json_line( writer: &mut impl Write, value: &serde_json::Value, tool: &str, ) -> Result<(), WarplineClientError> { - let body = serde_json::to_vec(value).map_err(|err| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("serialize MCP request: {err}"), - })?; - write_frame(writer, &Frame { body }).map_err(|err| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("write MCP frame: {err}"), - }) + let mut body = serde_json::to_vec(value) + .map_err(|err| mcp_tool_error(tool, &format!("serialize MCP request: {err}")))?; + body.push(b'\n'); + writer + .write_all(&body) + .map_err(|err| mcp_tool_error(tool, &format!("write MCP request: {err}"))) } -fn read_mcp_frame( - reader: &mut impl std::io::BufRead, +/// Read newline-delimited JSON-RPC responses until one carries `expected_id`, +/// skipping the init result and the notification's `id: null` error. EOF before +/// a match is a transport fault (honest-unavailable, never a clean empty). +fn read_response_for_id( + reader: &mut impl BufRead, expected_id: &str, tool: &str, ) -> Result { + let mut line = String::new(); loop { - let frame = read_frame(reader, ContentLengthCeiling::DEFAULT).map_err(|err| { - WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("read MCP frame: {err}"), - } - })?; - let value: serde_json::Value = - serde_json::from_slice(&frame.body).map_err(|err| WarplineClientError::McpTool { - tool: tool.to_owned(), - message: format!("parse MCP response: {err}"), - })?; - if value - .get("id") - .and_then(serde_json::Value::as_str) - .is_some_and(|id| id == expected_id) - { + line.clear(); + let read = reader + .read_line(&mut line) + .map_err(|err| mcp_tool_error(tool, &format!("read MCP response: {err}")))?; + if read == 0 { + return Err(mcp_tool_error( + tool, + "warpline closed its output before answering the churn call", + )); + } + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let value: serde_json::Value = serde_json::from_str(trimmed) + .map_err(|err| mcp_tool_error(tool, &format!("parse MCP response line: {err}")))?; + if value.get("id").and_then(serde_json::Value::as_str) == Some(expected_id) { return Ok(value); } + // A non-matching id (init result, the notification's id:null error) is + // skipped; keep reading for our call's response. } } +/// Pull the FROZEN churn envelope out of a `tools/call` JSON-RPC response: a +/// JSON-RPC `error` degrades; otherwise prefer `result.structuredContent` (the +/// envelope as an object) and fall back to `result.content[0].text` (the same +/// envelope as a JSON string) — `warpline-mcp` returns both. +fn envelope_from_response( + response: &serde_json::Value, + tool: &str, +) -> Result { + if let Some(error) = response.get("error").filter(|err| !err.is_null()) { + return Err(WarplineClientError::WarplineError { + tool: tool.to_owned(), + message: error.to_string(), + }); + } + let result = response + .get("result") + .ok_or_else(|| mcp_tool_error(tool, &format!("response missing result: {response}")))?; + if let Some(structured) = result.get("structuredContent").filter(|v| !v.is_null()) { + return Ok(structured.clone()); + } + if let Some(text) = result + .get("content") + .and_then(serde_json::Value::as_array) + .and_then(|content| content.first()) + .and_then(|item| item.get("text")) + .and_then(serde_json::Value::as_str) + { + return serde_json::from_str(text) + .map_err(|err| WarplineClientError::Contract(WarplineContractError::from(err))); + } + Err(mcp_tool_error( + tool, + &format!("response result has neither structuredContent nor content[0].text: {response}"), + )) +} + /// Resolve the command that launches warpline's MCP stdio server. Env override /// `LOOMWEAVE_WARPLINE_MCP_COMMAND` (with a `{project}` placeholder) wins; else -/// the `warpline mcp` shim. +/// the `warpline-mcp` binary. +/// +/// NOTE: the launcher is the standalone `warpline-mcp` executable, NOT +/// `warpline mcp` — warpline's CLI has no `mcp` subcommand, so `warpline mcp` +/// exits with a usage error and the write to its (already-closed) stdin fails +/// with a broken pipe. The MCP stdio server only ships as `warpline-mcp`. fn resolve_warpline_mcp_command(project_root: Option<&Path>) -> (String, Vec) { if let Ok(raw) = std::env::var("LOOMWEAVE_WARPLINE_MCP_COMMAND") { let mut parts: Vec = raw @@ -417,13 +578,198 @@ fn resolve_warpline_mcp_command(project_root: Option<&Path>) -> (String, Vec 1 else "both" +sidecar = sys.argv[2] if len(sys.argv) > 2 else None + +ENVELOPE = { + "schema": "warpline.entity_churn_count.v1", + "ok": True, + "data": {"items": [ + {"entity": {"sei": "loomweave:eid:aaa", "locator": "python:function:m::alpha"}, + "churn_count": 7, "first_changed_at": "2026-05-01T00:00:00Z", + "last_changed_at": "2026-06-13T00:00:00Z", "last_actor": "agent:codex"}, + {"entity": {"sei": "loomweave:eid:bbb", "locator": "python:function:m::beta"}, + "churn_count": 2, "first_changed_at": None, + "last_changed_at": "2026-06-01T00:00:00Z", "last_actor": None}, + {"entity": {"sei": "loomweave:eid:ccc", "locator": "python:function:m::gamma"}, + "churn_count": 0, "first_changed_at": None, + "last_changed_at": None, "last_actor": None} + ]} +} + +def send(obj): + sys.stdout.write(json.dumps(obj) + "\n") + sys.stdout.flush() + +for line in sys.stdin: + line = line.strip() + if not line: + continue + req = json.loads(line) + method = req.get("method") + rid = req.get("id") + if method == "initialize": + send({"jsonrpc": "2.0", "id": rid, "result": { + "protocolVersion": "2025-03-26", + "serverInfo": {"name": "fake-warpline", "version": "0"}, + "capabilities": {"tools": {}}}}) + elif method == "tools/call": + args = (req.get("params") or {}).get("arguments") or {} + if sidecar: + with open(sidecar, "w") as f: + json.dump(args, f) + if mode == "hang": + time.sleep(60) + continue + result = {"content": [{"type": "text", "text": json.dumps(ENVELOPE, sort_keys=True)}]} + if mode != "text_only": + result["structuredContent"] = ENVELOPE + send({"jsonrpc": "2.0", "id": rid, "result": result}) + elif rid is not None: + send({"jsonrpc": "2.0", "id": rid, "error": {"code": -32601, "message": "unknown"}}) + else: + send({"jsonrpc": "2.0", "id": None, "error": {"code": -32601, "message": "unknown method"}}) +"#; + + fn write_fake_server(dir: &Path) -> PathBuf { + let script = dir.join("fake_warpline.py"); + std::fs::write(&script, FAKE_SERVER_PY).expect("write fake warpline server"); + script + } + + fn fake_client( + script: &Path, + mode: &str, + sidecar: Option<&Path>, + project_root: &Path, + timeout_secs: u64, + ) -> WarplineMcpClient { + let mut args = vec![script.display().to_string(), mode.to_owned()]; + if let Some(sidecar) = sidecar { + args.push(sidecar.display().to_string()); + } + WarplineMcpClient { + command: ("python3".to_owned(), args), + project_root: Some(project_root.to_path_buf()), + timeout: Duration::from_secs(timeout_secs), + } + } + + /// The transport regression: over the REAL newline-delimited subprocess + /// transport (not the injected fake `WarplineLookup`), the churn read + /// completes, sends the required `repo`, omits the unsupported `actor`, and + /// parses the frozen envelope. This is the bug the consumer hit: the prior + /// Content-Length framing hung against warpline's line transport, and the + /// call omitted `repo` / sent `actor`. + #[test] + fn real_transport_sends_repo_omits_actor_and_parses_envelope() { + let dir = tempfile::tempdir().expect("temp dir"); + let script = write_fake_server(dir.path()); + let sidecar = dir.path().join("args.json"); + let client = fake_client(&script, "both", Some(&sidecar), dir.path(), 10); + + let refs = vec![WarplineEntityRef::for_entity( + "python:function:m::alpha", + Some("loomweave:eid:aaa"), + )]; + let response = client + .entity_churn_counts(&refs, None) + .expect("churn read succeeds over the newline-delimited transport"); + assert_eq!( + response.data.items.len(), + 3, + "the frozen 3-item envelope round-tripped" + ); + + let args: serde_json::Value = + serde_json::from_str(&std::fs::read_to_string(&sidecar).expect("sidecar written")) + .expect("sidecar JSON"); + assert_eq!( + args["repo"], + serde_json::json!(dir.path().display().to_string()), + "the required repo arg is sent, equal to the project root" + ); + assert!( + args.get("actor").is_none(), + "the unsupported actor param must NOT be sent: {args}" + ); + assert!( + args.get("entity_refs").is_some(), + "entity_refs is forwarded: {args}" + ); + } + + /// Requirement #4: when warpline returns only the text envelope (no + /// `structuredContent`), the consumer still parses it via the fallback. + #[test] + fn real_transport_parses_text_envelope_when_structured_content_absent() { + let dir = tempfile::tempdir().expect("temp dir"); + let script = write_fake_server(dir.path()); + let client = fake_client(&script, "text_only", None, dir.path(), 10); + + let refs = vec![WarplineEntityRef::for_entity( + "python:function:m::alpha", + Some("loomweave:eid:aaa"), + )]; + let response = client + .entity_churn_counts(&refs, None) + .expect("text-envelope fallback parses"); + assert_eq!(response.data.items.len(), 3); + } + + /// Requirement #5: a warpline child that completes the handshake then never + /// answers the call must DEGRADE via the bounded timeout, not hang forever. + #[test] + fn real_transport_times_out_instead_of_hanging() { + let dir = tempfile::tempdir().expect("temp dir"); + let script = write_fake_server(dir.path()); + let client = fake_client(&script, "hang", None, dir.path(), 1); + + let refs = vec![WarplineEntityRef::for_entity( + "python:function:m::alpha", + Some("loomweave:eid:aaa"), + )]; + let start = std::time::Instant::now(); + let err = client + .entity_churn_counts(&refs, None) + .expect_err("a hung warpline must error, not hang"); + let elapsed = start.elapsed(); + + assert!( + elapsed < Duration::from_secs(15), + "must return promptly via the timeout (1s), took {elapsed:?}" + ); + assert!( + matches!(err, WarplineClientError::McpTool { .. }), + "a transport-level fault: {err}" + ); + assert!( + err.to_string().contains("did not respond"), + "the timeout reason is surfaced for honest-degrade: {err}" + ); + } + /// The recorded FROZEN `warpline.entity_churn_count.v1` envelope used as the /// GV-LW-2 producer fixture: 3 refs, two observed (`churn_count >= 1`), one /// never-observed (`churn_count: 0`, present, not omitted, not an error). @@ -566,4 +912,79 @@ mod tests { }; assert!(WarplineMcpClient::from_config(&enabled, None).is_some()); } + + /// Regression guard for the headline live-found defect: the default launcher + /// is the standalone `warpline-mcp` binary, NOT `warpline mcp` (which is not a + /// warpline subcommand and exits with a usage error → broken pipe). The + /// transport tests inject the command, so this is the only check that pins the + /// default resolution. Guarded against the env override leaking in from the + /// surrounding environment. + #[test] + fn default_command_is_warpline_mcp_binary_not_subcommand() { + if std::env::var_os("LOOMWEAVE_WARPLINE_MCP_COMMAND").is_some() { + return; // env override active; the default is not under test here + } + let (program, args) = resolve_warpline_mcp_command(None); + assert_eq!(program, "warpline-mcp"); + assert!( + args.is_empty(), + "the MCP server takes no subcommand args: {args:?}" + ); + } + + /// `overflow_partial` distinguishes a TRUNCATED read (warpline kept an in-band + /// lead and spilled the rest) from a complete `clean` answer — the signal the + /// consumer uses so a truncated-out `0` is not read as never-observed. + #[test] + fn overflow_partial_reports_truncation_only_when_partial() { + // clean (no overflow) → None: every 0 is a genuine never-observed count. + let clean = parse_churn_count_response(GV_LW_2_FIXTURE).unwrap(); + assert_eq!(clean.overflow_partial(), None); + + // partial → Some((total, counted)) read from warpline's own carrier. + let partial = parse_churn_count_response( + r#"{ + "schema": "warpline.entity_churn_count.v1", "ok": true, + "data": { + "items": [{"entity": {"locator": "python:function:m::a"}, "churn_count": 5}], + "overflow": {"total": 574, "returned": 200, + "dumped_to": "/abs/.weft/warpline/overflow/x.json", + "reason_class": "partial", + "cause": "574 items exceeded the 200-item in-band cap", + "fix": "read the full list from the dump"} + } + }"#, + ) + .unwrap(); + assert_eq!(partial.overflow_partial(), Some((574, 200))); + } + + /// `unresolved_ref_count` flags items warpline could not key-match (null + /// locator) and leaves genuine never-observed items (resolved, non-null + /// locator, count 0) alone — the two kinds of `0` the consumer must not + /// conflate. + #[test] + fn unresolved_ref_count_flags_null_locator_only() { + // GV-LW-2 fixture: all 3 items carry a locator (gamma is a genuine + // never-observed 0, resolved) → zero unresolved. + let resolved = parse_churn_count_response(GV_LW_2_FIXTURE).unwrap(); + assert_eq!(resolved.unresolved_ref_count(), 0); + + // A SEI-ref miss: warpline echoes the sei but a null locator + count 0. + let with_miss = parse_churn_count_response( + r#"{ + "schema": "warpline.entity_churn_count.v1", "ok": true, + "data": {"items": [ + {"entity": {"sei": "loomweave:eid:hit", "locator": "python:function:m::a"}, + "churn_count": 4}, + {"entity": {"sei": "loomweave:eid:miss", "locator": null}, + "churn_count": 0}, + {"entity": {"sei": "loomweave:eid:miss2", "locator": null}, + "churn_count": 0} + ]} + }"#, + ) + .unwrap(); + assert_eq!(with_miss.unresolved_ref_count(), 2); + } } diff --git a/crates/loomweave-mcp/src/catalogue/shortcuts.rs b/crates/loomweave-mcp/src/catalogue/shortcuts.rs index be9f4275..45b42d8e 100644 --- a/crates/loomweave-mcp/src/catalogue/shortcuts.rs +++ b/crates/loomweave-mcp/src/catalogue/shortcuts.rs @@ -1059,8 +1059,60 @@ fn rank_and_finalize_churn( // Provenance + honest-empty note. A genuinely empty answer (warpline present, // no in-scope entity carried a recorded change) is honest-empty, never clean. let is_empty = response_json["page"]["total"] == json!(0); + // Warpline truncation disclosure: when warpline bounded the read to an in-band + // lead (more candidates than its overflow cap), the entities ranked below the + // lead are absent from the join and graft `churn_count: 0` — a TRUNCATION, not + // a never-observed 0. Disclose it so the two kinds of 0 are not conflated + // (this is the honesty floor; reading warpline's overflow dump for complete + // coverage of an over-cap scope is a tracked follow-up — narrow `scope` for + // exact counts meanwhile). Bites `recently_changed` hardest: a recent but + // low-total-churn entity ranked outside the lead is dropped as count-0. + let partial = response.overflow_partial(); + // Keying-miss disclosure, symmetric with `churn_truncated`: warpline returns + // `churn_count: 0` with a null locator for a ref it could not key-match (an + // SEI loomweave holds but warpline has not recorded, or a divergent locator + // dialect). That 0 is "real churn unknown", NOT a never-observed 0 — without + // this a scoped query over key-missed entities reads as "this code never + // changes" (the absence-as-clean failure this module exists to prevent). Only + // SEI-keyed misses are detectable (see `unresolved_ref_count`). + let unresolved = response.unresolved_ref_count(); if let Some(object) = response_json.as_object_mut() { mode.tag(object); + if unresolved > 0 { + object.insert( + "churn_unresolved".to_owned(), + json!({ + "count": unresolved, + "reason": format!( + "warpline could not key-match {unresolved} in-scope candidate(s) \ + (loomweave sent an SEI warpline has not recorded, or the locator \ + dialect differs); they are shown with churn_count 0 here but their real \ + churn is UNKNOWN, not zero — a federation keying gap, not a \ + never-observed 0. (recently_changed drops them entirely.)" + ), + }), + ); + } + if let Some((total, counted)) = partial { + let uncounted = total.saturating_sub(counted).max(0); + object.insert( + "churn_truncated".to_owned(), + json!({ + "truncated": true, + "counted": counted, + "total_candidates": total, + "uncounted": uncounted, + "reason": format!( + "warpline truncated the churn read to its top {counted} entities by \ + churn_count; {uncounted} in-scope candidate(s) ranked below that are \ + shown with churn_count 0 here and may be undercounted — a truncation, \ + NOT a never-observed 0. Narrow `scope` for exact counts; complete \ + over-cap coverage (reading warpline's overflow dump) is a tracked \ + follow-up." + ), + }), + ); + } if is_empty { object.insert( "signal".to_owned(), diff --git a/crates/loomweave-mcp/tests/warpline_churn_consumer.rs b/crates/loomweave-mcp/tests/warpline_churn_consumer.rs index 28af6c8e..ab83cb05 100644 --- a/crates/loomweave-mcp/tests/warpline_churn_consumer.rs +++ b/crates/loomweave-mcp/tests/warpline_churn_consumer.rs @@ -105,6 +105,74 @@ impl WarplineLookup for UnreachableWarplineClient { } } +/// A warpline client whose envelope carries an overflow `reason_class: "partial"` +/// — warpline truncated the read to an in-band lead. Echoes a real count for +/// alpha only; beta + gamma are absent from `items` (the truncated tail), so the +/// consumer grafts 0 onto them. The consumer must DISCLOSE this truncation +/// (`churn_truncated`) rather than letting those 0s read as never-observed. +struct PartialOverflowWarplineClient; + +impl WarplineLookup for PartialOverflowWarplineClient { + fn entity_churn_counts( + &self, + _entity_refs: &[WarplineEntityRef], + _window: Option<&Value>, + ) -> Result { + // total 3 candidates, only 1 returned in-band (the lead); reason partial. + let envelope = format!( + r#"{{ + "schema": "warpline.entity_churn_count.v1", "ok": true, + "data": {{ + "items": [ + {{"entity": {{"sei": "{SEI_ALPHA}", "locator": "{LOC_ALPHA}"}}, + "churn_count": 7, "last_changed_at": "2026-06-13T00:00:00Z", + "last_actor": "agent:codex"}} + ], + "overflow": {{"total": 3, "returned": 1, + "dumped_to": "/abs/.weft/warpline/overflow/churn.json", + "reason_class": "partial", + "cause": "3 items exceeded the in-band cap", + "fix": "read the dump"}} + }} + }}"# + ); + parse_churn_count_response(&envelope).map_err(WarplineClientError::Contract) + } +} + +/// A warpline client modelling a KEYING MISS: loomweave sends SEI refs warpline +/// has not recorded, so warpline echoes each ref with `churn_count: 0` and a +/// NULL locator. The consumer must DISCLOSE this (`churn_unresolved`) rather than +/// let the zeros read as "this code never changes" — the lacuna failure mode +/// (warpline keys by path-locator with null sei; loomweave sends dotted-locator +/// SEIs that miss). +struct UnresolvedKeyWarplineClient; + +impl WarplineLookup for UnresolvedKeyWarplineClient { + fn entity_churn_counts( + &self, + entity_refs: &[WarplineEntityRef], + _window: Option<&Value>, + ) -> Result { + // One item per ref, each a miss: sei echoed, locator null, count 0. + let items: Vec = entity_refs + .iter() + .map(|r| { + format!( + r#"{{"entity": {{"sei": "{}", "locator": null}}, "churn_count": 0}}"#, + r.value + ) + }) + .collect(); + let envelope = format!( + r#"{{"schema": "warpline.entity_churn_count.v1", "ok": true, + "data": {{"items": [{}]}}}}"#, + items.join(", ") + ); + parse_churn_count_response(&envelope).map_err(WarplineClientError::Contract) + } +} + fn open_project() -> (tempfile::TempDir, std::path::PathBuf, Connection) { let project = tempfile::tempdir().expect("temp project"); let dir = project.path().join(".weft/loomweave"); @@ -350,3 +418,76 @@ async fn high_churn_degrades_honestly_when_warpline_unreachable() { "the warpline error reason is surfaced, not swallowed" ); } + +/// Honest-truncation — warpline answered but bounded the read to an in-band lead +/// (overflow `reason_class: "partial"`). The truncated-out entities graft +/// `churn_count: 0`, but the surface must DISCLOSE the truncation +/// (`churn_truncated`) so those 0s are not conflated with never-observed 0s. This +/// is the honesty floor for an over-cap scope (complete coverage via the overflow +/// dump is a tracked follow-up). +#[tokio::test] +async fn high_churn_discloses_warpline_overflow_truncation() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let state = state_with_warpline(project.path(), &db, Arc::new(PartialOverflowWarplineClient)); + + let envelope = call_tool(&state, "entity_high_churn_list", json!({})).await; + let result = ok_payload(&envelope); + + // Not a hard error; the in-band entity carries its real count. + assert_eq!(result["churn_source"], json!("warpline")); + let alpha = result["entities"] + .as_array() + .expect("entities") + .iter() + .find(|e| e["id"] == json!(LOC_ALPHA)) + .expect("alpha present"); + assert_eq!(alpha["churn_count"], json!(7)); + + // The truncation is disclosed with warpline's own counts — NOT silently + // swallowed into all-plausible zeros. + let truncated = &result["churn_truncated"]; + assert_eq!(truncated["truncated"], json!(true)); + assert_eq!(truncated["total_candidates"], json!(3)); + assert_eq!(truncated["counted"], json!(1)); + assert_eq!(truncated["uncounted"], json!(2)); + assert!( + truncated["reason"] + .as_str() + .unwrap_or_default() + .to_lowercase() + .contains("truncat"), + "the disclosure must name the truncation: {truncated}" + ); +} + +/// Honest keying-miss — warpline answered but could not key-match the refs (null +/// locator, count 0). A non-empty all-zero result must DISCLOSE the keying gap +/// (`churn_unresolved`) so the zeros are not read as "never changes" — the +/// scoped-all-zeros failure the overflow disclosure's twin must also cover. +#[tokio::test] +async fn high_churn_discloses_warpline_keying_miss() { + let (project, db, conn) = open_project(); + seed_three_entities(&conn); + let state = state_with_warpline(project.path(), &db, Arc::new(UnresolvedKeyWarplineClient)); + + let envelope = call_tool(&state, "entity_high_churn_list", json!({})).await; + let result = ok_payload(&envelope); + + // Non-empty (3 candidates ranked) but every count is 0 — and that is DISCLOSED + // as a keying miss, not silently shipped as a clean all-zero answer. + assert_eq!(result["page"]["total"], json!(3)); + assert_eq!(result["churn_source"], json!("warpline")); + assert_eq!(result["churn_unresolved"]["count"], json!(3)); + assert!( + result["churn_unresolved"]["reason"] + .as_str() + .unwrap_or_default() + .to_lowercase() + .contains("key"), + "the disclosure must name the keying gap: {}", + result["churn_unresolved"] + ); + // A genuine all-zero (resolved, never-observed) must NOT trip this — covered by + // the GV-LW-2 fixture where every item has a non-null locator (unresolved 0). +}