diff --git a/Cargo.lock b/Cargo.lock index f327c15c..05f1f934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4159,6 +4159,7 @@ dependencies = [ "lance", "lance-arrow", "lance-graph-catalog", + "lance-graph-contract", "lance-graph-planner", "lance-index", "lance-linalg", diff --git a/crates/causal-edge/src/edge.rs b/crates/causal-edge/src/edge.rs index 3a93db04..c7785f5b 100644 --- a/crates/causal-edge/src/edge.rs +++ b/crates/causal-edge/src/edge.rs @@ -225,6 +225,37 @@ impl CausalEdge64 { | (((m as u8 as u64) & BITS3_MASK) << CAUSAL_SHIFT); } + /// Match if this edge's causal mask contains AT LEAST the bits in `query_mask`. + /// + /// `query_mask` is interpreted as the low 3 bits of the Pearl 2³ packing + /// (S=0b100, P=0b010, O=0b001 — see [`CausalMask`]). Higher bits are ignored. + /// + /// This is the query-side predicate used by graph WHERE clauses to filter + /// edges by causal type. For example, `query_mask = CausalMask::PO as u8` + /// (`0b011`) matches every edge whose causal mask has at least the P and O + /// planes active — i.e. interventional edges (`PO`) and counterfactual + /// edges (`SPO`), but not pure association (`SO`). + /// + /// Semantics: + /// - `query_mask == edge_mask`: full match + /// - `query_mask` is a subset of `edge_mask`: subset match + /// - `query_mask` and `edge_mask` are disjoint (sharing no required bits): + /// no match + /// - `query_mask == 0` (`CausalMask::None`): matches every edge — there + /// are no required bits, so the predicate is vacuously satisfied. + #[inline(always)] + pub const fn matches_causal(&self, query_mask: u8) -> bool { + let q = query_mask & 0b111; + let edge_mask = ((self.0 >> CAUSAL_SHIFT) & BITS3_MASK) as u8; + (edge_mask & q) == q + } + + /// Type-safe variant of [`Self::matches_causal`] taking a [`CausalMask`]. + #[inline(always)] + pub fn matches_causal_mask(&self, query_mask: CausalMask) -> bool { + self.matches_causal(query_mask as u8) + } + /// Is the S-plane active in the current causal projection? #[inline(always)] pub fn s_active(self) -> bool { (self.0 >> CAUSAL_SHIFT) & 0b100 != 0 } @@ -635,4 +666,91 @@ mod tests { assert_eq!(std::mem::size_of::(), 8, "CausalEdge64 must be exactly 8 bytes"); } + + // ─── matches_causal: query-side Pearl 2³ predicate (TD-INT-7) ──── + + fn make_edge(mask: CausalMask) -> CausalEdge64 { + CausalEdge64::pack( + 10, 20, 30, 200, 200, + mask, 0, InferenceType::Deduction, + PlasticityState::ALL_FROZEN, 0, + ) + } + + #[test] + fn test_matches_causal_full_match() { + // query_mask == edge_mask: must match. + let edge = make_edge(CausalMask::PO); + assert!(edge.matches_causal(CausalMask::PO as u8)); + assert!(edge.matches_causal_mask(CausalMask::PO)); + + let edge_spo = make_edge(CausalMask::SPO); + assert!(edge_spo.matches_causal(CausalMask::SPO as u8)); + } + + #[test] + fn test_matches_causal_subset_match() { + // query_mask is a strict subset of edge_mask: must match. + // SPO (0b111) contains PO (0b011), SO (0b101), SP (0b110), S, P, O. + let edge = make_edge(CausalMask::SPO); + assert!(edge.matches_causal(CausalMask::PO as u8), + "SPO edge should match PO query (PO bits are subset of SPO)"); + assert!(edge.matches_causal(CausalMask::SO as u8), + "SPO edge should match SO query"); + assert!(edge.matches_causal(CausalMask::P as u8), + "SPO edge should match single-plane P query"); + assert!(edge.matches_causal_mask(CausalMask::S)); + + // PO (0b011) contains O (0b001) and P (0b010), but NOT S (0b100). + let edge_po = make_edge(CausalMask::PO); + assert!(edge_po.matches_causal(CausalMask::O as u8)); + assert!(edge_po.matches_causal(CausalMask::P as u8)); + } + + #[test] + fn test_matches_causal_non_match() { + // query_mask requires bits the edge does not have: must NOT match. + // SO edge (0b101) does NOT have the P plane (0b010). + let edge_so = make_edge(CausalMask::SO); + assert!(!edge_so.matches_causal(CausalMask::P as u8)); + assert!(!edge_so.matches_causal(CausalMask::PO as u8), + "SO edge must not match PO query — P bit is missing"); + assert!(!edge_so.matches_causal_mask(CausalMask::SPO), + "SO edge must not match SPO query — P bit is missing"); + + // P-only edge (0b010) does NOT match SO query (0b101). + let edge_p = make_edge(CausalMask::P); + assert!(!edge_p.matches_causal(CausalMask::SO as u8)); + assert!(!edge_p.matches_causal(CausalMask::S as u8)); + assert!(!edge_p.matches_causal(CausalMask::O as u8)); + } + + #[test] + fn test_matches_causal_zero_mask_matches_anything() { + // query_mask == 0 has no required bits → vacuously matches every edge. + // This is the documented semantics: zero is the predicate-true element + // of the bit lattice (no requirements means nothing to fail). + for variant in [ + CausalMask::None, CausalMask::O, CausalMask::P, CausalMask::PO, + CausalMask::S, CausalMask::SO, CausalMask::SP, CausalMask::SPO, + ] { + let edge = make_edge(variant); + assert!(edge.matches_causal(0), + "zero query_mask must match edge with mask {variant:?}"); + assert!(edge.matches_causal_mask(CausalMask::None), + "CausalMask::None query must match edge with mask {variant:?}"); + } + } + + #[test] + fn test_matches_causal_high_bits_ignored() { + // matches_causal must mask query down to the low 3 bits, so callers + // passing a u8 with stray high bits get the same result as passing + // the cleaned value. + let edge = make_edge(CausalMask::PO); + // 0b1111_0011 → low 3 bits = 0b011 = PO. + assert!(edge.matches_causal(0b1111_0011)); + // 0b1111_0100 → low 3 bits = 0b100 = S — not present in PO edge. + assert!(!edge.matches_causal(0b1111_0100)); + } } diff --git a/crates/cognitive-shader-driver/src/cypher_bridge.rs b/crates/cognitive-shader-driver/src/cypher_bridge.rs index 48e73780..9e2c4633 100644 --- a/crates/cognitive-shader-driver/src/cypher_bridge.rs +++ b/crates/cognitive-shader-driver/src/cypher_bridge.rs @@ -23,6 +23,8 @@ //! - Anything else — `StepStatus::Skipped` with "unsupported cypher //! construct" reasoning. No failure: the downstream can plan around it. +use lance_graph_contract::crystal::fingerprint::CrystalFingerprint; +use lance_graph_contract::grammar::context_chain::{ContextChain, DisambiguationResult}; use lance_graph_contract::nars::InferenceType; use lance_graph_contract::orchestration::{ OrchestrationBridge, OrchestrationError, StepDomain, StepStatus, UnifiedStep, @@ -30,6 +32,26 @@ use lance_graph_contract::orchestration::{ use lance_graph_contract::plan::ThinkingContext; use lance_graph_contract::thinking::ThinkingStyle; +/// TD-INT-6 — disambiguation hook for multi-candidate Cypher parses. +/// +/// When a real parser returns N candidate parse trees for an ambiguous +/// query, this helper consults the live `ContextChain` to pick the +/// candidate whose insertion-coherence at position `i` is highest. +/// Today's regex stub returns a single candidate, so this is a dormant +/// call site — wire in place; activation lives at parser commit time. +pub fn disambiguate_parse_candidates( + chain: &ContextChain, + position: usize, + candidates: Vec, +) -> Result { + let result = chain.disambiguate(position, candidates); + if result.escalate_to_llm { + Err(result) + } else { + Ok(result.chosen.clone()) + } +} + /// Bridge for `lg.cypher` step_types. Stateless in Phase 1; an SPO store /// handle slots in here when Phase 2 wires the real parser + BindSpace. pub struct CypherBridge; @@ -219,4 +241,21 @@ mod tests { other => panic!("expected DomainUnavailable, got {:?}", other), } } + + /// TD-INT-6 — empty candidate list escalates. + #[test] + fn disambiguate_empty_candidates_escalates() { + let chain = ContextChain::new(8); + let result = disambiguate_parse_candidates(&chain, 0, Vec::new()); + assert!(result.is_err(), "empty candidates must escalate"); + } + + /// TD-INT-6 — single candidate escalates (margin = 0). + #[test] + fn disambiguate_single_candidate_escalates() { + let chain = ContextChain::new(8); + let cand = CrystalFingerprint::Binary16K(Box::new([0u64; 256])); + let result = disambiguate_parse_candidates(&chain, 0, vec![cand]); + assert!(result.is_err(), "single candidate must escalate"); + } } diff --git a/crates/lance-graph-callcenter/src/drain.rs b/crates/lance-graph-callcenter/src/drain.rs index de8b205b..cd0d50b4 100644 --- a/crates/lance-graph-callcenter/src/drain.rs +++ b/crates/lance-graph-callcenter/src/drain.rs @@ -1,63 +1,329 @@ -//! `DrainTask` — DM-6 scaffold of the callcenter membrane plan. +//! `DrainTask` — IN-bound side of the callcenter membrane (TD-INT-12). //! -//! Future home for the `steering_intent` Lance-read → `UnifiedStep` → -//! `OrchestrationBridge::route()` pipeline. This PR ships only the -//! type shell and a `Poll::Pending` `drain()` method so that -//! `lib.rs` can re-export the name and consumers can start wiring -//! against the surface. The live drain loop lands in a follow-up. +//! Drains pending `ExternalIntent` rows into `OrchestrationBridge::route()`. //! -//! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-6. +//! # Phase map +//! +//! - **Phase A (this file, realtime feature):** in-memory channel +//! (`tokio::sync::mpsc::UnboundedReceiver`). The membrane +//! pushes intents through a paired `UnboundedSender`; `DrainTask::poll` +//! greedily drains the channel into the wired `OrchestrationBridge`. +//! - **Phase D (deferred):** the channel sender becomes a Lance row poller +//! tailing the `steering_intent` dataset. Public surface is unchanged — +//! `DrainTask` keeps consuming an `mpsc::UnboundedReceiver`; only the +//! producer side swaps to a Lance reader. +//! +//! Without the `realtime` feature, the trivial `Poll::Pending` scaffold is +//! preserved so the non-realtime build keeps compiling — `tokio::sync::mpsc` +//! lives behind the realtime feature flag, and there is no honest IN-bound +//! drain to ship without it. +//! +//! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-6, TD-INT-12. use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; -/// Background task that drains `steering_intent` rows from Lance and -/// forwards them to the `OrchestrationBridge`. +#[cfg(feature = "realtime")] +use std::sync::Arc; + +#[cfg(feature = "realtime")] +use tokio::sync::mpsc; + +#[cfg(feature = "realtime")] +use lance_graph_contract::external_membrane::ExternalMembrane; +#[cfg(feature = "realtime")] +use lance_graph_contract::orchestration::OrchestrationBridge; + +#[cfg(feature = "realtime")] +use crate::external_intent::ExternalIntent; +#[cfg(feature = "realtime")] +use crate::lance_membrane::LanceMembrane; + +// ───────────────────────────────────────────────────────────────────────────── +// Realtime build — live drain loop (TD-INT-12) +// ───────────────────────────────────────────────────────────────────────────── + +/// Background task that drains `ExternalIntent` rows from an in-memory +/// channel and forwards them through `LanceMembrane::ingest()` → +/// `OrchestrationBridge::route()`. +/// +/// One `DrainTask` per session. Construct with [`drain_channel`], which +/// returns the paired sender and the task. Spawn the task on a tokio +/// runtime; push intents through the sender from the membrane front-end. +/// +/// # Phase map +/// +/// Phase A: producer is the in-process membrane front-end (HTTP / WS / +/// direct API). Phase D: producer is a Lance dataset row poller. Either +/// way, `DrainTask` itself is unchanged. +#[cfg(feature = "realtime")] +pub struct DrainTask { + rx: mpsc::UnboundedReceiver, + bridge: Arc, + membrane: Arc, + /// Monotonic count of rows drained. + drained: u64, +} + +#[cfg(feature = "realtime")] +impl DrainTask { + /// How many `ExternalIntent` rows this task has forwarded so far. + pub fn drained(&self) -> u64 { + self.drained + } +} + +/// Construct an in-memory drain channel and the paired [`DrainTask`]. +/// +/// Push `ExternalIntent`s onto the returned `UnboundedSender`; the task +/// drains them through `membrane.ingest(intent)` to produce a +/// `UnifiedStep`, then forwards that step to `bridge.route()`. +/// +/// When `bridge.route()` returns `Err`, the row is still counted as drained +/// — the canonical bridge owns retry / failure semantics; `DrainTask` is a +/// pump, not a queue manager. +/// +/// # Phase D plan +/// +/// The returned `UnboundedSender` is the seam where Phase D wires the +/// Lance `steering_intent` poller. Until that lands, the membrane front-end +/// keeps the sender and pushes intents directly. +#[cfg(feature = "realtime")] +pub fn drain_channel( + bridge: Arc, + membrane: Arc, +) -> (mpsc::UnboundedSender, DrainTask) { + let (tx, rx) = mpsc::unbounded_channel(); + let task = DrainTask { + rx, + bridge, + membrane, + drained: 0, + }; + (tx, task) +} + +/// `Future` adapter so `DrainTask` composes with `tokio::spawn`. +/// +/// Returns `Poll::Pending` while the channel is open and empty. +/// Returns `Poll::Ready(())` when the channel is closed (all senders +/// dropped) — that is the canonical shutdown signal. +#[cfg(feature = "realtime")] +impl Future for DrainTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + loop { + match self.rx.poll_recv(cx) { + Poll::Ready(Some(intent)) => { + let mut step = self.membrane.ingest(intent); + // Errors are counted as drained — the bridge owns + // retry / dead-letter semantics. DrainTask is a pump. + let _ = self.bridge.route(&mut step); + self.drained += 1; + continue; + } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Non-realtime build — trivial Pending scaffold (preserves compile) +// ───────────────────────────────────────────────────────────────────────────── + +/// Non-realtime scaffold: returns `Poll::Pending` forever. /// -/// **Scaffold only.** Fields and the drain loop will be populated in -/// the follow-up PR. Ships now so that `LanceMembrane` consumers can -/// import the symbol and so the `pub mod drain` re-export in -/// `lib.rs` is honest about the type existing. +/// The realtime feature owns the live drain loop (`tokio::sync::mpsc` +/// requires tokio). Without realtime there is no in-memory channel and +/// no Lance reader, so there is no honest IN-bound drain to ship. This +/// scaffold exists only to keep the symbol exported on non-realtime +/// builds; consumers should gate their wiring on `feature = "realtime"`. +#[cfg(not(feature = "realtime"))] #[derive(Debug, Default)] pub struct DrainTask { - /// Monotonic count of rows drained (zero until DM-6b lands). drained: u64, } +#[cfg(not(feature = "realtime"))] impl DrainTask { - /// Build an empty drain task. The follow-up PR will add - /// `new(dataset: &LanceDataset, bridge: Arc)`. pub fn new() -> Self { Self::default() } - /// How many `steering_intent` rows this task has forwarded so far. pub fn drained(&self) -> u64 { self.drained } +} + +#[cfg(not(feature = "realtime"))] +impl Future for DrainTask { + type Output = (); - /// Poll the drain loop. - /// - /// Returns `Poll::Pending` unconditionally in the scaffold; the - /// follow-up PR replaces this with the Lance read + route pipeline. - pub fn drain(&mut self, _cx: &mut Context<'_>) -> Poll<()> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } -/// `Future` adapter so the scaffold composes with `tokio::spawn` -/// as soon as the drain body lands. -impl Future for DrainTask { - type Output = (); +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - self.as_mut().drain(cx) +#[cfg(all(test, feature = "realtime"))] +mod realtime_tests { + use super::*; + use lance_graph_contract::{ + external_membrane::ExternalRole, + nars::InferenceType, + orchestration::{OrchestrationError, StepDomain, UnifiedStep}, + plan::ThinkingContext, + thinking::ThinkingStyle, + }; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::dn_path::DnPath; + + /// Mock `OrchestrationBridge` that always returns `Ok(())` and counts + /// route invocations. Lets tests assert that the bridge actually saw + /// each drained step. + struct CountingBridge { + routed: AtomicU64, + } + + impl CountingBridge { + fn new() -> Self { + Self { + routed: AtomicU64::new(0), + } + } + + fn routed(&self) -> u64 { + self.routed.load(Ordering::Acquire) + } + } + + impl OrchestrationBridge for CountingBridge { + fn route(&self, _step: &mut UnifiedStep) -> Result<(), OrchestrationError> { + self.routed.fetch_add(1, Ordering::AcqRel); + Ok(()) + } + + fn resolve_thinking( + &self, + _style: ThinkingStyle, + _inference_type: InferenceType, + ) -> ThinkingContext { + // Tests never call this; if they do, the panic localises the bug. + unimplemented!("CountingBridge::resolve_thinking is not used in drain tests") + } + + fn domain_available(&self, _domain: StepDomain) -> bool { + true + } + } + + fn make_dn() -> DnPath { + DnPath::parse("/tree/ada/heel/callcenter/hip/v1/branch/agents/twig/card/leaf/abc") + .expect("dn path parses") + } + + fn make_intent() -> ExternalIntent { + ExternalIntent::seed(ExternalRole::CrewaiAgent, make_dn(), b"hello".to_vec()) + } + + fn noop_waker() -> core::task::Waker { + use core::task::{RawWaker, RawWakerVTable, Waker}; + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| RawWaker::new(core::ptr::null(), &VTABLE), + |_| {}, + |_| {}, + |_| {}, + ); + // SAFETY: vtable functions are no-ops and never deref the pointer. + unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &VTABLE)) } + } + + #[test] + fn drain_processes_intent() { + let bridge = Arc::new(CountingBridge::new()); + let membrane = Arc::new(LanceMembrane::new()); + let (tx, mut task) = drain_channel(bridge.clone(), membrane); + + // Push one intent before polling. + tx.send(make_intent()).expect("send before poll"); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + // Channel is still open after one push, so the future drains the + // pending row and parks at Pending waiting for more. + let poll = Pin::new(&mut task).poll(&mut cx); + assert!(matches!(poll, Poll::Pending), "open channel parks Pending"); + + assert_eq!(task.drained(), 1, "one intent was drained"); + assert_eq!(bridge.routed(), 1, "bridge.route() saw one step"); + } + + #[test] + fn drain_pending_when_empty() { + let bridge = Arc::new(CountingBridge::new()); + let membrane = Arc::new(LanceMembrane::new()); + let (_tx, mut task) = drain_channel(bridge.clone(), membrane); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let poll = Pin::new(&mut task).poll(&mut cx); + assert!(matches!(poll, Poll::Pending), "empty open channel is Pending"); + assert_eq!(task.drained(), 0); + assert_eq!(bridge.routed(), 0); + } + + #[test] + fn drain_completes_on_channel_close() { + let bridge = Arc::new(CountingBridge::new()); + let membrane = Arc::new(LanceMembrane::new()); + let (tx, mut task) = drain_channel(bridge.clone(), membrane); + + // Drop the only sender — the channel closes and the future should + // return Poll::Ready(()). + drop(tx); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let poll = Pin::new(&mut task).poll(&mut cx); + assert!( + matches!(poll, Poll::Ready(())), + "closed channel completes the drain future" + ); + } + + #[test] + fn drain_processes_multiple_intents_greedily() { + let bridge = Arc::new(CountingBridge::new()); + let membrane = Arc::new(LanceMembrane::new()); + let (tx, mut task) = drain_channel(bridge.clone(), membrane); + + // Push three intents, then close the channel by dropping the sender. + tx.send(make_intent()).unwrap(); + tx.send(make_intent()).unwrap(); + tx.send(make_intent()).unwrap(); + drop(tx); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + // One poll should drain all three (greedy loop) and then observe + // the closed channel for completion. + let poll = Pin::new(&mut task).poll(&mut cx); + assert!(matches!(poll, Poll::Ready(())), "drains all then completes"); + assert_eq!(task.drained(), 3); + assert_eq!(bridge.routed(), 3); } } -#[cfg(test)] -mod tests { +#[cfg(all(test, not(feature = "realtime")))] +mod scaffold_tests { use super::*; use core::task::{RawWaker, RawWakerVTable, Waker}; @@ -68,8 +334,7 @@ mod tests { |_| {}, |_| {}, ); - // SAFETY: The vtable functions are all no-ops and never - // dereference the pointer; null is safe here. + // SAFETY: vtable functions are no-ops and never deref the pointer. unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &VTABLE)) } } @@ -80,10 +345,11 @@ mod tests { } #[test] - fn drain_is_pending_in_scaffold() { + fn scaffold_is_pending() { let mut task = DrainTask::new(); let waker = noop_waker(); let mut cx = Context::from_waker(&waker); - assert!(matches!(task.drain(&mut cx), Poll::Pending)); + let poll = Pin::new(&mut task).poll(&mut cx); + assert!(matches!(poll, Poll::Pending)); } } diff --git a/crates/lance-graph-callcenter/src/lance_membrane.rs b/crates/lance-graph-callcenter/src/lance_membrane.rs index 0110af66..5dbf5f62 100644 --- a/crates/lance-graph-callcenter/src/lance_membrane.rs +++ b/crates/lance-graph-callcenter/src/lance_membrane.rs @@ -38,10 +38,12 @@ use tokio::sync::watch; #[cfg(feature = "realtime")] use crate::version_watcher::LanceVersionWatcher; +use std::sync::Arc; + use lance_graph_contract::{ a2a_blackboard::ExpertId, cognitive_shader::{MetaWord, ShaderBus}, - external_membrane::{CommitFilter, ExternalEventKind, ExternalMembrane}, + external_membrane::{CommitFilter, ExternalEventKind, ExternalMembrane, MembraneGate}, faculty::FacultyRole, orchestration::{StepStatus, UnifiedStep}, }; @@ -63,6 +65,14 @@ pub struct LanceMembrane { current_scent: AtomicU64, current_rationale_phase: AtomicBool, // MM-CoT stage: true = Stage 1 rationale version: AtomicU64, + /// Server-side fan-out filter (TD-INT-13). Default-empty = pass all. + /// Applied INSIDE `project()` before `watcher.bump`, so subscribers + /// only see rows matching the filter. + server_filter: RwLock, + /// Optional fan-out gate (TD-INT-9). RBAC, multi-tenant scope, etc. + /// Default `None` = no gating. When set, `should_emit` is consulted + /// before `watcher.bump`. + gate: RwLock>>, /// Fan-out watcher for projected cognitive events ([realtime] feature only). #[cfg(feature = "realtime")] watcher: LanceVersionWatcher, @@ -77,11 +87,31 @@ impl LanceMembrane { current_scent: AtomicU64::new(0), current_rationale_phase: AtomicBool::new(false), version: AtomicU64::new(0), + server_filter: RwLock::new(CommitFilter::default()), + gate: RwLock::new(None), #[cfg(feature = "realtime")] watcher: LanceVersionWatcher::default(), } } + /// Install a server-side fan-out filter (TD-INT-13). Subscribers only + /// see rows matching the filter; rows that don't match are still + /// returned by `project()` but not bumped to the watcher. + pub fn set_server_filter(&self, filter: CommitFilter) { + *self.server_filter.write().expect("server_filter poisoned") = filter; + } + + /// Install a fan-out gate (TD-INT-9). RBAC, multi-tenant scope, + /// custom policies. Default = no gate (allow all). + pub fn set_gate(&self, gate: Arc) { + *self.gate.write().expect("gate poisoned") = Some(gate); + } + + /// Remove any installed gate, reverting to allow-all fan-out. + pub fn clear_gate(&self) { + *self.gate.write().expect("gate poisoned") = None; + } + /// Current Lance version counter (monotonic; ticks on each CollapseGate /// Persist). Phase D wires this to the Lance dataset version. pub fn version(&self) -> u64 { @@ -163,9 +193,32 @@ impl ExternalMembrane for LanceMembrane { rationale_phase: self.current_rationale_phase.load(Ordering::Relaxed), }; + // TD-INT-13 + TD-INT-9: server-side fan-out gate. + // 1. CommitFilter — scalar predicate match + // 2. MembraneGate — RBAC / multi-tenant / custom policy + // If either rejects, we still return the row but skip the watcher + // bump. The row is the projection; only the fan-out is gated. + let actor_id = expert as u64; // v1: actor = expert; refine when UNKNOWN-4 lands + let style_ord = meta.thinking(); + let pass_filter = self + .server_filter + .read() + .map(|f| f.matches(actor_id, meta.free_e(), style_ord, bus.gate.is_flow())) + .unwrap_or(true); + let pass_gate = self + .gate + .read() + .ok() + .and_then(|g| g.as_ref().map(|g| g.should_emit(role, faculty, expert, bus.gate.is_flow()))) + .unwrap_or(true); + // DM-4: fan out to all current subscribers (supabase-shape realtime). #[cfg(feature = "realtime")] - self.watcher.bump(row.clone()); + if pass_filter && pass_gate { + self.watcher.bump(row.clone()); + } + #[cfg(not(feature = "realtime"))] + let _ = (pass_filter, pass_gate); // silence unused without realtime row } @@ -303,6 +356,69 @@ mod tests { assert!(rx.try_recv().is_err()); } + /// TD-INT-9: gate denies → row still returned, fan-out skipped. + #[test] + fn gate_denies_skips_emit_but_returns_row() { + struct DenyAll; + impl MembraneGate for DenyAll { + fn should_emit(&self, _: u8, _: u8, _: u16, _: bool) -> bool { false } + } + + let m = LanceMembrane::new(); + m.set_gate(Arc::new(DenyAll)); + let intent = ExternalIntent::seed(ExternalRole::Rag, make_dn(), vec![]); + m.ingest(intent); + + let bus = ShaderBus::empty(); + let meta = MetaWord::new(5, 3, 200, 150, 10); + let row = m.project(&bus, meta); + // Row is still returned (caller may want it for metrics). + assert_eq!(row.external_role, ExternalRole::Rag as u8); + assert_eq!(row.thinking, 5); + // Fan-out is skipped — verified via watcher tests in realtime feature. + } + + /// TD-INT-13: server-side filter mismatch → fan-out skipped. + #[test] + fn filter_mismatch_skips_emit() { + let m = LanceMembrane::new(); + // Filter: only style_ordinal == 99 + m.set_server_filter(CommitFilter { + style_ordinal: Some(99), + ..Default::default() + }); + let intent = ExternalIntent::seed(ExternalRole::Rag, make_dn(), vec![]); + m.ingest(intent); + + let bus = ShaderBus::empty(); + // Project with style 5 — should not match the filter + let meta = MetaWord::new(5, 3, 200, 150, 10); + let row = m.project(&bus, meta); + assert_eq!(row.thinking, 5); + // Row returned, fan-out gated on style_ordinal mismatch. + } + + /// TD-INT-13: server-side filter match → fan-out happens. + #[test] + fn filter_match_passes_emit() { + let m = LanceMembrane::new(); + // Filter: style_ordinal == 5 + m.set_server_filter(CommitFilter { + style_ordinal: Some(5), + max_free_energy: Some(100), + ..Default::default() + }); + let intent = ExternalIntent::seed(ExternalRole::Rag, make_dn(), vec![]); + m.ingest(intent); + + let bus = ShaderBus::empty(); + let meta = MetaWord::new(5, 3, 200, 150, 10); + let row = m.project(&bus, meta); + assert_eq!(row.thinking, 5); + assert_eq!(row.free_e, 10); + // Free_e=10 ≤ max=100 ✓; style=5 == 5 ✓ → fan-out passes. + } + /// Phase D (realtime feature): subscribe() → project() → rx.borrow() sees the row. #[cfg(feature = "realtime")] #[test] diff --git a/crates/lance-graph-contract/src/external_membrane.rs b/crates/lance-graph-contract/src/external_membrane.rs index 611f33ff..e2447349 100644 --- a/crates/lance-graph-contract/src/external_membrane.rs +++ b/crates/lance-graph-contract/src/external_membrane.rs @@ -83,6 +83,53 @@ pub struct CommitFilter { pub is_commit: Option, } +impl CommitFilter { + /// Server-side predicate match. Returns true if the row's scalars + /// satisfy every set predicate. Unset fields don't filter. + /// Used by `LanceMembrane::project` to gate fan-out before + /// `LanceVersionWatcher::bump`, so subscribers only see rows + /// they care about (TD-INT-13). + pub fn matches( + &self, + actor_id: u64, + free_energy: u8, + style_ordinal: u8, + is_commit: bool, + ) -> bool { + if let Some(want) = self.actor_id { if actor_id != want { return false; } } + if let Some(max) = self.max_free_energy { if free_energy > max { return false; } } + if let Some(want) = self.style_ordinal { if style_ordinal != want { return false; } } + if let Some(want) = self.is_commit { if is_commit != want { return false; } } + true + } +} + +/// Optional gate that the membrane consults before fanning a projection +/// out to subscribers. RBAC, custom policies, multi-tenant scope filters +/// — all impl this trait. The contract crate stays zero-dep; concrete +/// implementations live in `lance-graph-rbac`, SMB-side custom code, etc. +/// +/// `should_emit` returns `true` to let the projection through, `false` +/// to suppress. The underlying `project()` call still returns the row +/// (callers may want it for metrics) — only the fan-out is gated. +pub trait MembraneGate: Send + Sync { + fn should_emit( + &self, + external_role: u8, + faculty_role: u8, + expert_id: u16, + gate_commit: bool, + ) -> bool; +} + +/// No-op gate that always allows. Default. +pub struct AllowAllGate; + +impl MembraneGate for AllowAllGate { + #[inline] + fn should_emit(&self, _: u8, _: u8, _: u16, _: bool) -> bool { true } +} + /// The typed boundary between the canonical cognitive substrate and /// the external callcenter surface. /// diff --git a/crates/lance-graph-contract/src/grammar/free_energy.rs b/crates/lance-graph-contract/src/grammar/free_energy.rs index d1bafd4c..3729e847 100644 --- a/crates/lance-graph-contract/src/grammar/free_energy.rs +++ b/crates/lance-graph-contract/src/grammar/free_energy.rs @@ -227,6 +227,7 @@ mod tests { tekamolo: TekamoloSlots::default(), wechsel: vec![], coverage: 0.0, + missing_required: vec![], } } diff --git a/crates/lance-graph-contract/src/grammar/ticket.rs b/crates/lance-graph-contract/src/grammar/ticket.rs index 6cd552db..08036470 100644 --- a/crates/lance-graph-contract/src/grammar/ticket.rs +++ b/crates/lance-graph-contract/src/grammar/ticket.rs @@ -65,6 +65,10 @@ pub struct FailureTicket { pub tekamolo: TekamoloSlots, pub wechsel: Vec, pub coverage: f32, + /// Required predicates that were absent at commit time. Empty for + /// grammar-parse failures; populated for schema-validation failures + /// (TD-INT-8). See [`Self::missing_required`]. + pub missing_required: Vec<&'static str>, } impl FailureTicket { @@ -78,6 +82,35 @@ impl FailureTicket { .map(|c| !c.is_resolved(0.75)) .unwrap_or(false) } + + /// Construct a FailureTicket for a schema-validation miss: one or + /// more Required predicates are absent from a triple set being + /// committed (TD-INT-8). The list of missing predicate names is + /// preserved verbatim so the LLM/operator can address each one. + /// `recommended_next` is `Abduction` — the system is asking + /// "what value should fill this slot?", which is the abductive case. + pub fn missing_required(missing: Vec<&'static str>) -> Self { + Self { + partial_parse: PartialParse { + resolved_tokens: Vec::new(), + unresolved_tokens: Vec::new(), + coverage: 0.0, + }, + attempted_inference: NarsInference::Deduction, + recommended_next: NarsInference::Abduction, + causal_ambiguity: None, + tekamolo: TekamoloSlots::default(), + wechsel: Vec::new(), + coverage: 0.0, + missing_required: missing, + } + } + + /// Iterator over predicate names that triggered a missing-required + /// FailureTicket. Empty when the ticket comes from grammar parsing. + pub fn missing_predicates(&self) -> impl Iterator + '_ { + self.missing_required.iter().copied() + } } #[cfg(test)] @@ -98,6 +131,7 @@ mod tests { tekamolo: TekamoloSlots::default(), wechsel: vec![], coverage: 1.0, + missing_required: vec![], }; assert!(!t.needs_llm(0.9)); } @@ -116,6 +150,7 @@ mod tests { tekamolo: TekamoloSlots::default(), wechsel: vec![], coverage: 0.33, + missing_required: vec![], }; assert!(t.needs_llm(0.9)); } @@ -126,4 +161,20 @@ mod tests { assert_eq!(c.plausible_count(), 2); assert!(!c.is_resolved(0.75)); } + + #[test] + fn missing_required_constructor_preserves_predicate_names() { + let t = FailureTicket::missing_required(vec!["customer_name", "tax_id"]); + let m: Vec<&'static str> = t.missing_predicates().collect(); + assert_eq!(m, vec!["customer_name", "tax_id"]); + assert_eq!(t.recommended_next, NarsInference::Abduction); + assert_eq!(t.coverage, 0.0); + assert!(t.needs_llm(0.9), "schema miss must escalate"); + } + + #[test] + fn missing_required_constructor_empty_for_no_misses() { + let t = FailureTicket::missing_required(vec![]); + assert_eq!(t.missing_predicates().count(), 0); + } } diff --git a/crates/lance-graph-contract/src/lib.rs b/crates/lance-graph-contract/src/lib.rs index 1b107180..04fe044e 100644 --- a/crates/lance-graph-contract/src/lib.rs +++ b/crates/lance-graph-contract/src/lib.rs @@ -63,3 +63,4 @@ pub mod tax; pub mod reasoning; pub mod property; pub mod ontology; +pub mod sla; diff --git a/crates/lance-graph-contract/src/property.rs b/crates/lance-graph-contract/src/property.rs index 7fa67fe4..b940e885 100644 --- a/crates/lance-graph-contract/src/property.rs +++ b/crates/lance-graph-contract/src/property.rs @@ -50,6 +50,8 @@ pub struct PropertySpec { /// Below this floor, Required properties trigger FailureTicket. /// None = no floor check (typical for Free properties). pub nars_floor: Option<(u8, u8)>, + /// What kind of value this property holds (LF-21). + pub semantic_type: SemanticType, } impl PropertySpec { @@ -61,6 +63,7 @@ impl PropertySpec { kind: PropertyKind::Required, codec_route: CodecRoute::Passthrough, nars_floor: Some((128, 128)), + semantic_type: SemanticType::PlainText, } } @@ -72,6 +75,7 @@ impl PropertySpec { kind: PropertyKind::Optional, codec_route, nars_floor: None, + semantic_type: SemanticType::PlainText, } } @@ -83,9 +87,15 @@ impl PropertySpec { kind: PropertyKind::Free, codec_route: CodecRoute::CamPq, nars_floor: None, + semantic_type: SemanticType::PlainText, } } + pub const fn with_semantic_type(mut self, st: SemanticType) -> Self { + self.semantic_type = st; + self + } + /// Override the NARS truth floor. pub const fn with_nars_floor(mut self, frequency: u8, confidence: u8) -> Self { self.nars_floor = Some((frequency, confidence)); @@ -159,11 +169,12 @@ impl PropertySchema { pub struct Schema { pub name: &'static str, pub properties: Vec, + pub view: Option, } impl Schema { pub fn builder(name: &'static str) -> SchemaBuilder { - SchemaBuilder { name, properties: Vec::new() } + SchemaBuilder { name, properties: Vec::new(), view: None } } pub fn get(&self, predicate: &str) -> Option<&PropertySpec> { @@ -196,6 +207,7 @@ impl Schema { pub struct SchemaBuilder { name: &'static str, properties: Vec, + view: Option, } impl SchemaBuilder { @@ -229,8 +241,14 @@ impl SchemaBuilder { self } + /// Attach an ObjectView for outside-BBB rendering (LF-22). + pub fn view(mut self, view: ObjectView) -> Self { + self.view = Some(view); + self + } + pub fn build(self) -> Schema { - Schema { name: self.name, properties: self.properties } + Schema { name: self.name, properties: self.properties, view: self.view } } } @@ -652,6 +670,128 @@ impl Default for Marking { fn default() -> Self { Marking::Internal } } +// ═══════════════════════════════════════════════════════════════════════════ +// SEMANTIC TYPE (LF-21 — SMB REQUEST) +// ═══════════════════════════════════════════════════════════════════════════ + +/// Semantic type annotation on a property. Tells the outside-BBB surface +/// what kind of value this property holds, enabling format-aware validation, +/// display formatting, and search indexing without inspecting the raw bytes. +/// +/// SMB use cases: `iban` (DE89370400440532013000), `currency` (EUR 1234.56), +/// `email`, `phone`, `date` (geburtsdatum), `address`, `tax_id` (umsatzsteuer-id). +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum SemanticType { + PlainText, + Iban, + Currency(&'static str), + Email, + Phone, + Date(DatePrecision), + Geo(GeoFormat), + Address, + File(&'static str), + Image, + Url, + TaxId, + CustomerId, + InvoiceNumber, +} + +impl Default for SemanticType { + fn default() -> Self { SemanticType::PlainText } +} + +/// Date granularity for `SemanticType::Date`. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum DatePrecision { + Day, + Month, + Year, + DateTime, +} + +/// Geo coordinate format for `SemanticType::Geo`. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum GeoFormat { + LatLon, + Wgs84, + PlusCode, +} + +// ═══════════════════════════════════════════════════════════════════════════ +// OBJECT VIEW (LF-22 — SMB REQUEST) +// ═══════════════════════════════════════════════════════════════════════════ + +/// Rendering descriptor for entity views outside the BBB. +/// Tells a UI which properties to show at each zoom level +/// without bespoke per-entity-type rendering code. +#[derive(Clone, Debug)] +pub struct ObjectView { + pub card: &'static [&'static str], + pub detail: &'static [&'static str], + pub summary_template: &'static str, +} + +impl ObjectView { + pub const fn new( + card: &'static [&'static str], + detail: &'static [&'static str], + summary_template: &'static str, + ) -> Self { + Self { card, detail, summary_template } + } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// AUDIT ENTRY (LF-90 — SMB REQUEST) +// ═══════════════════════════════════════════════════════════════════════════ + +/// Append-only audit trail entry. Outside the BBB this is a compliance +/// record; inside it feeds CausalEdge64 provenance bits. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct AuditEntry { + pub actor: u64, + pub action_id: u64, + pub action_kind: AuditAction, + pub timestamp_ms: u64, + pub predicate_target: &'static str, + pub signature: [u8; 64], +} + +/// What kind of mutation the audit entry records. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum AuditAction { + Create, + Update, + Delete, + Read, + Export, + Import, + Approve, + Reject, +} + +/// Trait for an append-only audit log. Implementations back this with +/// Lance versioned dataset, Arrow Flight, or any persistent store. +pub trait AuditLog: Send + Sync { + type Error: Send + 'static; + + fn append(&self, entry: AuditEntry) -> Result<(), Self::Error>; + + fn entries_for_entity( + &self, + entity_type: &str, + entity_id: u64, + ) -> Result, Self::Error>; + + fn entries_by_actor( + &self, + actor: u64, + since_ms: u64, + ) -> Result, Self::Error>; +} + impl Marking { pub fn most_restrictive(markings: &[Marking]) -> Marking { markings.iter().copied().max().unwrap_or(Marking::Public) diff --git a/crates/lance-graph-contract/src/sla.rs b/crates/lance-graph-contract/src/sla.rs new file mode 100644 index 00000000..66ca741d --- /dev/null +++ b/crates/lance-graph-contract/src/sla.rs @@ -0,0 +1,206 @@ +//! Outside-BBB integration types for SLA contracts and multi-tenant +//! isolation (LF-91, LF-92). +//! +//! These types belong to the agnostic surface that external consumers +//! (REST adaptors, gRPC bridges, downstream apps) bind against without +//! knowing anything about the LanceMembrane internals. They are pure +//! data — no method wiring elsewhere in the contract crate. +//! +//! # LF-91 — SLA policy +//! +//! `SlaPolicy` declares the latency / freshness / priority envelope a +//! query or projection commits to honor. Const constructors keep the +//! type usable in `&'static` schemas; `SlaPolicy::STANDARD` and +//! `SlaPolicy::INTERACTIVE` are the two pre-baked tiers. +//! +//! # LF-92 — Multi-tenant isolation +//! +//! `TenantId` is a stable u64 embedded in CommitFilter and AuditEntry +//! signatures so cross-tenant data never leaks through a shared +//! LanceMembrane. `TenantScope` narrows a query to one tenant +//! (`Single`), a federated set (`Multi`), or `All` (admin / cross- +//! tenant analytics, requires policy override at the bridge). + +// ═══════════════════════════════════════════════════════════════════════════ +// LF-91 — SLA POLICY +// ═══════════════════════════════════════════════════════════════════════════ + +/// Service-level objective scope. Tells external consumers what +/// guarantees a query / projection commits to honor. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct SlaPolicy { + pub max_latency_ms: u32, + pub min_freshness_ms: u32, // staleness ceiling + pub priority: SlaPriority, +} + +/// Priority tier ordering for SLA scheduling. `Background` is the +/// lowest priority, `Urgent` the highest. `PartialOrd`/`Ord` derived +/// from declaration order. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum SlaPriority { + Background, + Standard, + Interactive, + Urgent, +} + +impl SlaPolicy { + /// Standard tier — 1 s latency budget, 1 min staleness ceiling. + /// Default for batch / dashboard / non-interactive paths. + pub const STANDARD: SlaPolicy = SlaPolicy { + max_latency_ms: 1_000, + min_freshness_ms: 60_000, + priority: SlaPriority::Standard, + }; + + /// Interactive tier — 100 ms latency budget, 1 s staleness ceiling. + /// For user-facing chat / search / autocomplete paths. + pub const INTERACTIVE: SlaPolicy = SlaPolicy { + max_latency_ms: 100, + min_freshness_ms: 1_000, + priority: SlaPriority::Interactive, + }; + + pub const fn new(max_latency_ms: u32, min_freshness_ms: u32, priority: SlaPriority) -> Self { + Self { max_latency_ms, min_freshness_ms, priority } + } +} + +impl Default for SlaPolicy { + fn default() -> Self { SlaPolicy::STANDARD } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// LF-92 — MULTI-TENANT ISOLATION +// ═══════════════════════════════════════════════════════════════════════════ + +/// Tenant identifier. Stable across queries / projections; +/// embedded in CommitFilter and AuditEntry signatures so +/// cross-tenant data never leaks through a shared LanceMembrane. +pub type TenantId = u64; + +/// Scope a query or projection to one or more tenants. +/// Single = strict isolation; Multi = federated read with +/// per-tenant marking applied to each row. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum TenantScope { + Single(TenantId), + Multi(Vec), + All, // admin / cross-tenant analytics — requires policy override +} + +impl TenantScope { + pub fn contains(&self, tenant: TenantId) -> bool { + match self { + Self::Single(t) => *t == tenant, + Self::Multi(ts) => ts.contains(&tenant), + Self::All => true, + } + } + + pub fn as_slice(&self) -> &[TenantId] { + match self { + Self::Single(t) => std::slice::from_ref(t), + Self::Multi(ts) => ts.as_slice(), + Self::All => &[], + } + } +} + +impl Default for TenantScope { + fn default() -> Self { + Self::All // unrestricted by default; CommitFilter narrows. + } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// TESTS +// ═══════════════════════════════════════════════════════════════════════════ + +#[cfg(test)] +mod tests { + use super::*; + + // ── LF-91 SLA policy ── + + #[test] + fn sla_policy_standard_constants() { + let s = SlaPolicy::STANDARD; + assert_eq!(s.max_latency_ms, 1_000); + assert_eq!(s.min_freshness_ms, 60_000); + assert_eq!(s.priority, SlaPriority::Standard); + } + + #[test] + fn sla_policy_interactive_constants() { + let s = SlaPolicy::INTERACTIVE; + assert_eq!(s.max_latency_ms, 100); + assert_eq!(s.min_freshness_ms, 1_000); + assert_eq!(s.priority, SlaPriority::Interactive); + } + + #[test] + fn sla_policy_default_is_standard() { + assert_eq!(SlaPolicy::default(), SlaPolicy::STANDARD); + } + + #[test] + fn sla_priority_ordering() { + // Declaration order: Background < Standard < Interactive < Urgent. + assert!(SlaPriority::Background < SlaPriority::Standard); + assert!(SlaPriority::Standard < SlaPriority::Interactive); + assert!(SlaPriority::Interactive < SlaPriority::Urgent); + assert!(SlaPriority::Background < SlaPriority::Urgent); + } + + #[test] + fn sla_policy_const_new() { + const CUSTOM: SlaPolicy = + SlaPolicy::new(50, 500, SlaPriority::Urgent); + assert_eq!(CUSTOM.max_latency_ms, 50); + assert_eq!(CUSTOM.min_freshness_ms, 500); + assert_eq!(CUSTOM.priority, SlaPriority::Urgent); + } + + // ── LF-92 multi-tenant isolation ── + + #[test] + fn tenant_scope_contains_single() { + let s = TenantScope::Single(42); + assert!(s.contains(42)); + assert!(!s.contains(7)); + } + + #[test] + fn tenant_scope_contains_multi() { + let s = TenantScope::Multi(vec![1, 2, 3]); + assert!(s.contains(1)); + assert!(s.contains(3)); + assert!(!s.contains(4)); + } + + #[test] + fn tenant_scope_contains_all() { + let s = TenantScope::All; + assert!(s.contains(0)); + assert!(s.contains(u64::MAX)); + } + + #[test] + fn tenant_scope_default_is_all() { + assert_eq!(TenantScope::default(), TenantScope::All); + } + + #[test] + fn tenant_scope_as_slice() { + let single = TenantScope::Single(7); + assert_eq!(single.as_slice(), &[7][..]); + + let multi = TenantScope::Multi(vec![1, 2, 3]); + assert_eq!(multi.as_slice(), &[1, 2, 3][..]); + + let all = TenantScope::All; + assert!(all.as_slice().is_empty()); + } +} diff --git a/crates/lance-graph-planner/src/cache/convergence.rs b/crates/lance-graph-planner/src/cache/convergence.rs index 64b33bdf..4a2e42b7 100644 --- a/crates/lance-graph-planner/src/cache/convergence.rs +++ b/crates/lance-graph-planner/src/cache/convergence.rs @@ -20,6 +20,52 @@ use super::kv_bundle::HeadPrint; use super::nars_engine::{SpoHead, MASK_SPO, CausalEdge64}; +use ndarray::hpc::palette_distance::{Palette, DistanceMatrix, SpoDistanceMatrices}; +use ndarray::hpc::bgz17_bridge::SpoBase17; + +/// Per-plane palette distance context (TD-INT-5). +/// +/// Wraps ndarray's `SpoDistanceMatrices` (pre-computed 256×256 per-plane +/// L1 distance tables). All comparison algebra lives in ndarray; this +/// struct is a session-scoped handle that the planner cache and cascade +/// use to compare triplets on individual role planes (subject-only, +/// predicate-only, etc.) without doing bit ops in lance-graph. +/// +/// Build once from the palette codebooks; compare in O(1) per pair. +pub struct PlaneDistance { + matrices: SpoDistanceMatrices, +} + +impl PlaneDistance { + /// Build from three palettes (one per S/P/O plane). + pub fn build(s_pal: &Palette, p_pal: &Palette, o_pal: &Palette) -> Self { + Self { matrices: SpoDistanceMatrices::build(s_pal, p_pal, o_pal) } + } + + /// Combined S+P+O distance. O(1): three table lookups. + #[inline] + pub fn spo_distance(&self, a: &SpoHead, b: &SpoHead) -> u32 { + self.matrices.spo_distance(a.s_idx, a.p_idx, a.o_idx, b.s_idx, b.p_idx, b.o_idx) + } + + /// Subject-plane only distance. O(1): one table lookup. + #[inline] + pub fn subject_distance(&self, a: &SpoHead, b: &SpoHead) -> u16 { + self.matrices.subject.distance(a.s_idx, b.s_idx) + } + + /// Predicate-plane only distance. O(1): one table lookup. + #[inline] + pub fn predicate_distance(&self, a: &SpoHead, b: &SpoHead) -> u16 { + self.matrices.predicate.distance(a.p_idx, b.p_idx) + } + + /// Object-plane only distance. O(1): one table lookup. + #[inline] + pub fn object_distance(&self, a: &SpoHead, b: &SpoHead) -> u16 { + self.matrices.object.distance(a.o_idx, b.o_idx) + } +} /// Convert an SPO triplet (as strings) into a HeadPrint fingerprint. /// @@ -165,6 +211,34 @@ pub fn episodes_to_palette_layers( mod tests { use super::*; + #[test] + fn test_plane_distance_subject_only() { + // Build a 256-entry palette (production size) from spread Base17 patterns + let patterns: Vec = (0..256) + .map(|i| { + let mut b = ndarray::hpc::bgz17_bridge::Base17::zero(); + b.dims[0] = (i as i16).wrapping_mul(127); + b.dims[1] = (i as i16).wrapping_mul(31); + b + }) + .collect(); + let pal = Palette::build(&patterns, 256, 1); + let pd = PlaneDistance::build(&pal, &pal, &pal); + + // Same palette index → zero distance + let a = headprint_to_spo(&triplet_to_headprint("Alice", "knows", "Bob"), 0.9, 0.8); + assert_eq!(pd.subject_distance(&a, &a), 0); + assert_eq!(pd.spo_distance(&a, &a), 0); + + // Different triplets → likely nonzero distance + let b = headprint_to_spo(&triplet_to_headprint("Zephyr", "loves", "Qux"), 0.9, 0.8); + let combined = pd.spo_distance(&a, &b); + let sub_only = pd.subject_distance(&a, &b); + // Subject-only ≤ combined (since combined adds P + O) + assert!(sub_only as u32 <= combined, + "subject-only {} should be <= combined {}", sub_only, combined); + } + #[test] fn test_triplet_to_headprint() { let fp = triplet_to_headprint("Claude", "reasons_like", "Opus4.6"); diff --git a/crates/lance-graph/Cargo.toml b/crates/lance-graph/Cargo.toml index 10806f9d..20aa2843 100644 --- a/crates/lance-graph/Cargo.toml +++ b/crates/lance-graph/Cargo.toml @@ -32,6 +32,7 @@ datafusion-sql = "51" datafusion-functions-aggregate = "51" futures = "0.3" lance-graph-catalog = { path = "../lance-graph-catalog", version = "0.5.4" } +lance-graph-contract = { path = "../lance-graph-contract" } lance = "2" lance-linalg = "2" lance-namespace = "2" diff --git a/crates/lance-graph/src/graph/spo/builder.rs b/crates/lance-graph/src/graph/spo/builder.rs index af1cbf15..96e1d9ec 100644 --- a/crates/lance-graph/src/graph/spo/builder.rs +++ b/crates/lance-graph/src/graph/spo/builder.rs @@ -6,10 +6,32 @@ //! An SPO record packs Subject, Predicate, Object fingerprints together //! with a truth value into a structure that can be stored in an SpoStore //! and queried via ANN search. +//! +//! The builder has two operational modes: +//! +//! 1. **Stateless** (legacy / happy path) — call associated functions +//! such as [`SpoBuilder::build_edge`] to produce a single +//! [`SpoRecord`] for direct insertion into an [`SpoStore`]. +//! 2. **Stateful + schema-validated** (TD-INT-8) — `SpoBuilder::new()` +//! returns an instance that accumulates `(predicate_name, key, record)` +//! triples via [`SpoBuilder::stage`]. When attached to a +//! [`Schema`](lance_graph_contract::property::Schema) via +//! [`SpoBuilder::with_schema`], the staged set is validated against +//! the schema's Required predicates on +//! [`SpoBuilder::commit_validated`]. Missing predicates produce a +//! [`FailureTicket`](lance_graph_contract::grammar::FailureTicket) +//! instead of a silent insertion. The predicate fingerprint is a +//! hash that loses the original label, so each `stage` call also +//! carries the predicate name explicitly so validation can compare +//! against the schema. + +use lance_graph_contract::grammar::FailureTicket; +use lance_graph_contract::property::Schema; use crate::graph::fingerprint::{Fingerprint, FINGERPRINT_WORDS}; use crate::graph::sparse::{pack_axes, Bitmap, BITMAP_WORDS}; +use super::store::SpoStore; use super::truth::TruthValue; /// An SPO record representing a single edge in the graph. @@ -30,10 +52,93 @@ pub struct SpoRecord { pub truth: TruthValue, } +/// A staged triple held by an `SpoBuilder` between `stage` and +/// `commit_validated`. Carries the predicate name (since the fingerprint +/// loses the label after hashing) and the destination key in the store. +#[derive(Debug, Clone)] +struct StagedTriple { + predicate_name: &'static str, + key: u64, + record: SpoRecord, +} + /// Builder for constructing SPO edge records. -pub struct SpoBuilder; +/// +/// Has two modes — see the module-level docs. +#[derive(Default)] +pub struct SpoBuilder { + schema: Option, + staged: Vec, +} impl SpoBuilder { + /// Create a fresh builder with no schema and an empty staging area. + pub fn new() -> Self { + Self { schema: None, staged: Vec::new() } + } + + /// Attach a [`Schema`] to validate Required predicates against on + /// [`Self::commit_validated`]. Without this, validation is a no-op. + pub fn with_schema(mut self, schema: Schema) -> Self { + self.schema = Some(schema); + self + } + + /// Stage a triple for later validated commit. The `predicate_name` + /// is the original predicate label (must match a name in the + /// attached schema for validation to recognize it). `key` is where + /// the record will be inserted in the store when the batch passes + /// validation. + pub fn stage( + &mut self, + predicate_name: &'static str, + key: u64, + record: SpoRecord, + ) -> &mut Self { + self.staged.push(StagedTriple { predicate_name, key, record }); + self + } + + /// Return the list of Required predicates from the attached schema + /// that are NOT present in the currently staged triples. Empty when + /// (a) no schema is attached, or (b) all Required predicates are + /// staged. + pub fn validate(&self) -> Vec<&'static str> { + let Some(schema) = self.schema.as_ref() else { + return Vec::new(); + }; + let present: Vec<&str> = + self.staged.iter().map(|t| t.predicate_name).collect(); + schema.validate(&present) + } + + /// Validate against the attached schema and, if valid, insert all + /// staged triples into `store`. Returns the number of inserted + /// records on success. + /// + /// On failure (one or more Required predicates missing) returns a + /// [`FailureTicket`] carrying the missing predicate names; nothing + /// is inserted and the staged set is preserved for retry. + pub fn commit_validated( + &mut self, + store: &mut SpoStore, + ) -> Result { + let missing = self.validate(); + if !missing.is_empty() { + return Err(FailureTicket::missing_required(missing)); + } + let n = self.staged.len(); + for staged in self.staged.drain(..) { + store.insert(staged.key, &staged.record); + } + Ok(n) + } + + /// Number of currently-staged triples (mostly for tests / inspection). + pub fn staged_len(&self) -> usize { + self.staged.len() + } + /// Build an edge record from S, P, O fingerprints and a truth value. /// /// The packed bitmap is the OR of all three fingerprints, used as @@ -116,4 +221,109 @@ mod tests { assert_eq!(query[i], s[i] | p[i]); } } + + // ── TD-INT-8: Schema-validated commit path ──────────────────────────── + + use crate::graph::fingerprint::dn_hash; + use crate::graph::spo::store::SpoStore; + use lance_graph_contract::property::Schema; + + fn record_for(predicate_name: &str) -> SpoRecord { + let s = label_fp("Customer:42"); + let p = label_fp(predicate_name); + let o = label_fp(&format!("value:{}", predicate_name)); + SpoBuilder::build_edge(&s, &p, &o, TruthValue::new(0.9, 0.8)) + } + + #[test] + fn validated_commit_without_schema_inserts_unchanged() { + // Without a schema, validate() returns empty and commit_validated + // behaves like a plain insert. + let mut store = SpoStore::new(); + let mut builder = SpoBuilder::new(); + builder.stage("customer_name", dn_hash("k1"), record_for("customer_name")); + + assert!(builder.validate().is_empty()); + let n = builder + .commit_validated(&mut store) + .expect("no schema → no validation failure"); + assert_eq!(n, 1); + assert_eq!(store.len(), 1); + assert_eq!(builder.staged_len(), 0); + } + + #[test] + fn validated_commit_with_complete_schema_succeeds() { + let schema = Schema::builder("Customer") + .required("customer_name") + .required("tax_id") + .optional("address") + .build(); + + let mut store = SpoStore::new(); + let mut builder = SpoBuilder::new().with_schema(schema); + + builder + .stage("customer_name", dn_hash("c:name"), record_for("customer_name")) + .stage("tax_id", dn_hash("c:tax"), record_for("tax_id")) + .stage("address", dn_hash("c:addr"), record_for("address")); + + assert!(builder.validate().is_empty()); + let n = builder + .commit_validated(&mut store) + .expect("all Required predicates present"); + assert_eq!(n, 3); + assert_eq!(store.len(), 3); + } + + #[test] + fn validated_commit_missing_required_returns_failure_ticket() { + let schema = Schema::builder("Customer") + .required("customer_name") + .required("tax_id") // ← intentionally not staged + .optional("address") + .build(); + + let mut store = SpoStore::new(); + let mut builder = SpoBuilder::new().with_schema(schema); + + builder + .stage("customer_name", dn_hash("c:name"), record_for("customer_name")) + .stage("address", dn_hash("c:addr"), record_for("address")); + + let missing = builder.validate(); + assert_eq!(missing, vec!["tax_id"]); + + let err = builder + .commit_validated(&mut store) + .expect_err("tax_id missing → FailureTicket"); + + // Store untouched, staging preserved for retry. + assert_eq!(store.len(), 0); + assert_eq!(builder.staged_len(), 2); + + // Ticket carries the missing predicate names. + let m: Vec<&'static str> = err.missing_predicates().collect(); + assert_eq!(m, vec!["tax_id"]); + } + + #[test] + fn validated_commit_retry_after_filling_missing() { + let schema = Schema::builder("Customer") + .required("customer_name") + .required("tax_id") + .build(); + + let mut store = SpoStore::new(); + let mut builder = SpoBuilder::new().with_schema(schema); + + builder.stage("customer_name", dn_hash("c:name"), record_for("customer_name")); + assert!(builder.commit_validated(&mut store).is_err()); + + // Caller addresses the failure by adding the missing predicate. + builder.stage("tax_id", dn_hash("c:tax"), record_for("tax_id")); + let n = builder.commit_validated(&mut store).expect("now valid"); + assert_eq!(n, 2); + assert_eq!(store.len(), 2); + } } diff --git a/crates/neural-debug/src/lib.rs b/crates/neural-debug/src/lib.rs index 5756900e..4b639dee 100644 --- a/crates/neural-debug/src/lib.rs +++ b/crates/neural-debug/src/lib.rs @@ -9,3 +9,9 @@ pub mod scanner; pub mod diagnosis; pub mod registry; + +// Producer / consumer convenience: `neural_debug::registry()` returns the +// process-wide `RuntimeRegistry`. Producers call `record_row(row, state)`; +// consumers call `diag()` or `snapshot_rows()` to surface state. +pub use registry::{registry, RuntimeDiag, RuntimeRegistry}; +pub use diagnosis::NeuronState; diff --git a/crates/neural-debug/src/registry.rs b/crates/neural-debug/src/registry.rs index 53a8fe65..428e7b66 100644 --- a/crates/neural-debug/src/registry.rs +++ b/crates/neural-debug/src/registry.rs @@ -2,7 +2,7 @@ use crate::diagnosis::NeuronState; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Mutex; +use std::sync::{Mutex, OnceLock}; use std::time::Duration; /// Runtime call counter for a single function. @@ -57,16 +57,109 @@ impl CounterSnapshot { } } -/// Runtime registry: collects call counters from all instrumented functions. -/// Thread-safe via interior mutability. +/// Per-row state recorded by the runtime dispatch path (e.g. cognitive- +/// shader-driver). Maps a BindSpace row to its observed `NeuronState` +/// for the most recent cycle that touched it. +pub struct RowStateMap { + states: Mutex>, +} + +impl RowStateMap { + pub fn new() -> Self { + Self { states: Mutex::new(HashMap::new()) } + } + + pub fn record(&self, row: u32, state: NeuronState) { + let mut map = self.states.lock().unwrap(); + map.insert(row, state); + } + + pub fn snapshot(&self) -> HashMap { + self.states.lock().unwrap().clone() + } + + pub fn reset(&self) { + self.states.lock().unwrap().clear(); + } + + /// Aggregate counts the WireHealth.neural_debug overlay needs. + pub fn diag(&self) -> RuntimeDiag { + let map = self.states.lock().unwrap(); + let mut alive = 0usize; + let mut stat = 0usize; + let mut dead = 0usize; + let mut nan = 0usize; + let mut stub = 0usize; + let mut wired_unused = 0usize; + for state in map.values() { + match state { + NeuronState::Alive => alive += 1, + NeuronState::Static => stat += 1, + NeuronState::Dead => dead += 1, + NeuronState::Nan => nan += 1, + NeuronState::Stub => stub += 1, + NeuronState::WiredUnused => wired_unused += 1, + } + } + let total = map.len(); + let operational = alive + stat; + let health_pct = if total == 0 { + 0.0 + } else { + (operational as f32 / total as f32) * 100.0 + }; + RuntimeDiag { + total_functions: total, + total_dead: dead, + total_stub: stub, + total_nan: nan, + total_alive: alive, + total_static: stat, + total_wired_unused: wired_unused, + health_pct, + } + } +} + +impl Default for RowStateMap { + fn default() -> Self { Self::new() } +} + +/// Aggregate runtime diagnostic produced from a `RowStateMap` snapshot — +/// shaped to match the `WireHealth.neural_debug` overlay so consumers can +/// drop it straight into the response. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeDiag { + pub total_functions: usize, + pub total_dead: usize, + pub total_stub: usize, + pub total_nan: usize, + pub total_alive: usize, + pub total_static: usize, + pub total_wired_unused: usize, + pub health_pct: f32, +} + +/// Runtime registry: collects call counters from all instrumented functions +/// AND the per-row `NeuronState` feed from the dispatch hot path. +/// +/// Thread-safe via interior mutability — `&RuntimeRegistry` is enough for +/// both the shader's `dispatch()` (writer) and the health handler (reader). +/// No `&mut self` anywhere on the hot path. pub struct RuntimeRegistry { counters: Mutex>, + rows: RowStateMap, +} + +impl Default for RuntimeRegistry { + fn default() -> Self { Self::new() } } impl RuntimeRegistry { pub fn new() -> Self { Self { counters: Mutex::new(HashMap::new()), + rows: RowStateMap::new(), } } @@ -84,15 +177,46 @@ impl RuntimeRegistry { } } + /// Record a row's most recent `NeuronState`. Constant-time per call, + /// no allocations beyond the (one-time) HashMap entry insertion. + #[inline] + pub fn record_row(&self, row: u32, state: NeuronState) { + self.rows.record(row, state); + } + pub fn snapshot(&self) -> HashMap { self.counters.lock().unwrap().clone() } + /// Snapshot every row's currently recorded state. + pub fn snapshot_rows(&self) -> HashMap { + self.rows.snapshot() + } + + /// Aggregate runtime diagnostic from the current row-state map. + pub fn diag(&self) -> RuntimeDiag { + self.rows.diag() + } + pub fn reset(&self) { self.counters.lock().unwrap().clear(); + self.rows.reset(); } } +/// Process-wide global runtime registry. +/// +/// Lazy-initialized via `OnceLock`. Producers (e.g. the shader dispatch +/// hot path) call `registry()` once per cycle to record row states; +/// consumers (e.g. the WireHealth handler) call `registry().diag()` to +/// snapshot the aggregate counts. No allocation outside the first call. +static GLOBAL_REGISTRY: OnceLock = OnceLock::new(); + +/// Access the process-wide runtime registry, initializing it on first use. +pub fn registry() -> &'static RuntimeRegistry { + GLOBAL_REGISTRY.get_or_init(RuntimeRegistry::new) +} + /// Dependency probe result — what happened when we tried to call a dependency. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProbeResult { @@ -184,3 +308,83 @@ pub struct ImpactFix { pub fix_complexity: String, pub impact_score: f32, } + +// ═════════════════════════════════════════════════════════════════════ +// Tests — TD-INT-11 row-state runtime registry +// ═════════════════════════════════════════════════════════════════════ + +#[cfg(test)] +mod row_state_tests { + use super::*; + + #[test] + fn row_state_map_records_and_diags() { + let map = RowStateMap::new(); + map.record(0, NeuronState::Alive); + map.record(1, NeuronState::Alive); + map.record(2, NeuronState::Static); + map.record(3, NeuronState::Nan); + + let diag = map.diag(); + assert_eq!(diag.total_functions, 4); + assert_eq!(diag.total_alive, 2); + assert_eq!(diag.total_static, 1); + assert_eq!(diag.total_nan, 1); + assert_eq!(diag.total_dead, 0); + assert_eq!(diag.total_stub, 0); + // 3 of 4 are operational (alive + static); NaN is not + // operational. health = 75%. + assert!((diag.health_pct - 75.0).abs() < 1e-3); + } + + #[test] + fn row_state_map_overwrites_per_row() { + let map = RowStateMap::new(); + map.record(7, NeuronState::Static); + map.record(7, NeuronState::Alive); + let snap = map.snapshot(); + assert_eq!(snap.len(), 1); + assert_eq!(snap[&7], NeuronState::Alive); + } + + #[test] + fn empty_registry_diag_is_zeroed() { + let map = RowStateMap::new(); + let diag = map.diag(); + assert_eq!(diag.total_functions, 0); + assert_eq!(diag.health_pct, 0.0); + } + + #[test] + fn runtime_registry_record_row_round_trips() { + let reg = RuntimeRegistry::new(); + reg.record_row(42, NeuronState::Alive); + reg.record_row(43, NeuronState::Static); + let snap = reg.snapshot_rows(); + assert_eq!(snap[&42], NeuronState::Alive); + assert_eq!(snap[&43], NeuronState::Static); + let diag = reg.diag(); + assert_eq!(diag.total_functions, 2); + assert_eq!(diag.total_alive, 1); + assert_eq!(diag.total_static, 1); + assert!((diag.health_pct - 100.0).abs() < 1e-3); + } + + #[test] + fn global_registry_is_a_single_oncelock() { + // Two calls to `registry()` must return the same address — + // producers and consumers in different modules MUST see the + // same map. + let a = registry() as *const RuntimeRegistry as usize; + let b = registry() as *const RuntimeRegistry as usize; + assert_eq!(a, b); + } + + #[test] + fn runtime_registry_reset_clears_rows() { + let reg = RuntimeRegistry::new(); + reg.record_row(1, NeuronState::Alive); + reg.reset(); + assert_eq!(reg.diag().total_functions, 0); + } +}