diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs deleted file mode 100644 index 6ceea807116..00000000000 --- a/rust/lance-core/src/cache.rs +++ /dev/null @@ -1,806 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -//! Cache implementation - -use std::any::{Any, TypeId}; -use std::borrow::Cow; -use std::sync::{ - Arc, - atomic::{AtomicU64, Ordering}, -}; - -use futures::{Future, FutureExt}; -use moka::future::Cache; - -use crate::Result; - -pub use deepsize::{Context, DeepSizeOf}; - -type ArcAny = Arc; - -#[derive(Clone)] -pub struct SizedRecord { - record: ArcAny, - size_accessor: Arc usize + Send + Sync>, -} - -impl std::fmt::Debug for SizedRecord { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SizedRecord") - .field("record", &self.record) - .finish() - } -} - -impl DeepSizeOf for SizedRecord { - fn deep_size_of_children(&self, _: &mut Context) -> usize { - (self.size_accessor)(&self.record) - } -} - -impl SizedRecord { - fn new(record: Arc) -> Self { - // +8 for the size of the Arc pointer itself - let size_accessor = - |record: &ArcAny| -> usize { record.downcast_ref::().unwrap().deep_size_of() + 8 }; - Self { - record, - size_accessor: Arc::new(size_accessor), - } - } -} - -#[derive(Clone)] -pub struct LanceCache { - cache: Arc>, - prefix: String, - hits: Arc, - misses: Arc, -} - -impl std::fmt::Debug for LanceCache { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LanceCache") - .field("cache", &self.cache) - .finish() - } -} - -impl DeepSizeOf for LanceCache { - fn deep_size_of_children(&self, _: &mut Context) -> usize { - self.cache - .iter() - .map(|(_, v)| (v.size_accessor)(&v.record)) - .sum() - } -} - -impl LanceCache { - pub fn with_capacity(capacity: usize) -> Self { - let cache = Cache::builder() - .max_capacity(capacity as u64) - .weigher(|_, v: &SizedRecord| { - (v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX) - }) - .support_invalidation_closures() - .build(); - Self { - cache: Arc::new(cache), - prefix: String::new(), - hits: Arc::new(AtomicU64::new(0)), - misses: Arc::new(AtomicU64::new(0)), - } - } - - pub fn no_cache() -> Self { - Self { - cache: Arc::new(Cache::new(0)), - prefix: String::new(), - hits: Arc::new(AtomicU64::new(0)), - misses: Arc::new(AtomicU64::new(0)), - } - } - - /// Appends a prefix to the cache key - /// - /// If this cache already has a prefix, the new prefix will be appended to - /// the existing one. - /// - /// Prefixes are used to create a namespace for the cache keys to avoid - /// collisions between different caches. - pub fn with_key_prefix(&self, prefix: &str) -> Self { - Self { - cache: self.cache.clone(), - prefix: format!("{}{}/", self.prefix, prefix), - hits: self.hits.clone(), - misses: self.misses.clone(), - } - } - - fn get_key(&self, key: &str) -> String { - if self.prefix.is_empty() { - key.to_string() - } else { - format!("{}/{}", self.prefix, key) - } - } - - /// Invalidate all entries in the cache that start with the given prefix - /// - /// The given prefix is appended to the existing prefix of the cache. If you - /// want to invalidate all at the current prefix, pass an empty string. - pub fn invalidate_prefix(&self, prefix: &str) { - let full_prefix = format!("{}{}", self.prefix, prefix); - self.cache - .invalidate_entries_if(move |(key, _typeid), _value| key.starts_with(&full_prefix)) - .expect("Cache configured correctly"); - } - - pub async fn size(&self) -> usize { - self.cache.run_pending_tasks().await; - self.cache.entry_count() as usize - } - - pub fn approx_size(&self) -> usize { - self.cache.entry_count() as usize - } - - pub async fn size_bytes(&self) -> usize { - self.cache.run_pending_tasks().await; - self.approx_size_bytes() - } - - pub fn approx_size_bytes(&self) -> usize { - self.cache.weighted_size() as usize - } - - async fn insert(&self, key: &str, metadata: Arc) { - let key = self.get_key(key); - let record = SizedRecord::new(metadata); - tracing::trace!( - target: "lance_cache::insert", - key = key, - type_id = std::any::type_name::(), - size = (record.size_accessor)(&record.record), - ); - self.cache.insert((key, TypeId::of::()), record).await; - } - - pub async fn insert_unsized( - &self, - key: &str, - metadata: Arc, - ) { - // In order to make the data Sized, we wrap in another pointer. - self.insert(key, Arc::new(metadata)).await - } - - async fn get(&self, key: &str) -> Option> { - let key = self.get_key(key); - if let Some(metadata) = self.cache.get(&(key, TypeId::of::())).await { - self.hits.fetch_add(1, Ordering::Relaxed); - Some(metadata.record.clone().downcast::().unwrap()) - } else { - self.misses.fetch_add(1, Ordering::Relaxed); - None - } - } - - pub async fn get_unsized( - &self, - key: &str, - ) -> Option> { - let outer = self.get::>(key).await?; - Some(outer.as_ref().clone()) - } - - /// Get an item - /// - /// If it exists in the cache return that - /// - /// If it doesn't then run `loader` to load the item, insert into cache, and return - async fn get_or_insert( - &self, - key: String, - loader: F, - ) -> Result> - where - F: FnOnce(&str) -> Fut, - Fut: Future> + Send, - { - let full_key = self.get_key(&key); - let cache_key = (full_key, TypeId::of::()); - - // Use optionally_get_with to handle concurrent requests - let hits = self.hits.clone(); - let misses = self.misses.clone(); - - // Use oneshot channels to track both errors and whether init was run - let (error_tx, error_rx) = tokio::sync::oneshot::channel(); - let (init_run_tx, mut init_run_rx) = tokio::sync::oneshot::channel(); - - let init = Box::pin(async move { - let _ = init_run_tx.send(()); - misses.fetch_add(1, Ordering::Relaxed); - match loader(&key).await { - Ok(value) => Some(SizedRecord::new(Arc::new(value))), - Err(e) => { - let _ = error_tx.send(e); - None - } - } - }); - - match self.cache.optionally_get_with(cache_key, init).await { - Some(metadata) => { - // Check if init was run or if this was a cache hit - match init_run_rx.try_recv() { - Ok(()) => { - // Init was run, miss was already recorded - } - Err(_) => { - // Init was not run, this is a cache hit - hits.fetch_add(1, Ordering::Relaxed); - } - } - Ok(metadata.record.clone().downcast::().unwrap()) - } - None => { - // The loader returned an error, retrieve it from the channel - match error_rx.await { - Ok(err) => Err(err), - Err(_) => Err(crate::Error::internal( - "Failed to retrieve error from cache loader", - )), - } - } - } - } - - pub async fn stats(&self) -> CacheStats { - self.cache.run_pending_tasks().await; - CacheStats { - hits: self.hits.load(Ordering::Relaxed), - misses: self.misses.load(Ordering::Relaxed), - num_entries: self.cache.entry_count() as usize, - size_bytes: self.cache.weighted_size() as usize, - } - } - - pub async fn clear(&self) { - self.cache.invalidate_all(); - self.cache.run_pending_tasks().await; - self.hits.store(0, Ordering::Relaxed); - self.misses.store(0, Ordering::Relaxed); - } - - // CacheKey-based methods - pub async fn insert_with_key(&self, cache_key: &K, metadata: Arc) - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - self.insert(&cache_key.key(), metadata).boxed().await - } - - pub async fn get_with_key(&self, cache_key: &K) -> Option> - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - self.get::(&cache_key.key()).boxed().await - } - - pub async fn get_or_insert_with_key( - &self, - cache_key: K, - loader: F, - ) -> Result> - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - F: FnOnce() -> Fut, - Fut: Future> + Send, - { - let key_str = cache_key.key().into_owned(); - Box::pin(self.get_or_insert(key_str, |_| loader())).await - } - - pub async fn insert_unsized_with_key(&self, cache_key: &K, metadata: Arc) - where - K: UnsizedCacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - self.insert_unsized(&cache_key.key(), metadata) - .boxed() - .await - } - - pub async fn get_unsized_with_key(&self, cache_key: &K) -> Option> - where - K: UnsizedCacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - self.get_unsized::(&cache_key.key()) - .boxed() - .await - } -} - -/// A weak reference to a LanceCache, used by indices to avoid circular references. -/// When the original cache is dropped, operations on this will gracefully no-op. -#[derive(Clone, Debug)] -pub struct WeakLanceCache { - inner: std::sync::Weak>, - prefix: String, - hits: Arc, - misses: Arc, -} - -impl WeakLanceCache { - /// Create a weak reference from a strong LanceCache - pub fn from(cache: &LanceCache) -> Self { - Self { - inner: Arc::downgrade(&cache.cache), - prefix: cache.prefix.clone(), - hits: cache.hits.clone(), - misses: cache.misses.clone(), - } - } - - /// Appends a prefix to the cache key - pub fn with_key_prefix(&self, prefix: &str) -> Self { - Self { - inner: self.inner.clone(), - prefix: format!("{}{}/", self.prefix, prefix), - hits: self.hits.clone(), - misses: self.misses.clone(), - } - } - - fn get_key(&self, key: &str) -> String { - if self.prefix.is_empty() { - key.to_string() - } else { - format!("{}/{}", self.prefix, key) - } - } - - /// Get an item from cache if the cache is still alive - pub async fn get(&self, key: &str) -> Option> { - let cache = self.inner.upgrade()?; - let key = self.get_key(key); - if let Some(metadata) = cache.get(&(key, TypeId::of::())).await { - self.hits.fetch_add(1, Ordering::Relaxed); - Some(metadata.record.clone().downcast::().unwrap()) - } else { - self.misses.fetch_add(1, Ordering::Relaxed); - None - } - } - - /// Insert an item if the cache is still alive - /// Returns true if the item was inserted, false if the cache is no longer available - pub async fn insert( - &self, - key: &str, - value: Arc, - ) -> bool { - if let Some(cache) = self.inner.upgrade() { - let key = self.get_key(key); - let record = SizedRecord::new(value); - cache.insert((key, TypeId::of::()), record).await; - true - } else { - log::warn!("WeakLanceCache: cache no longer available, unable to insert item"); - false - } - } - - /// Get or insert an item, computing it if necessary - pub async fn get_or_insert(&self, key: &str, f: F) -> Result> - where - T: DeepSizeOf + Send + Sync + 'static, - F: FnOnce() -> Fut, - Fut: Future> + Send, - { - if let Some(cache) = self.inner.upgrade() { - let full_key = self.get_key(key); - let cache_key = (full_key.clone(), TypeId::of::()); - - // Use optionally_get_with to handle concurrent requests properly - let hits = self.hits.clone(); - let misses = self.misses.clone(); - - // Track whether init was run (for metrics) - let (init_run_tx, mut init_run_rx) = tokio::sync::oneshot::channel(); - let (error_tx, error_rx) = tokio::sync::oneshot::channel(); - - let init = Box::pin(async move { - let _ = init_run_tx.send(()); - misses.fetch_add(1, Ordering::Relaxed); - match f().await { - Ok(value) => Some(SizedRecord::new(Arc::new(value))), - Err(e) => { - let _ = error_tx.send(e); - None - } - } - }); - - match cache.optionally_get_with(cache_key, init).await { - Some(record) => { - // Check if init was run or if this was a cache hit - match init_run_rx.try_recv() { - Ok(()) => { - // Init was run, miss was already recorded - } - Err(_) => { - // Init was not run, this was a cache hit - hits.fetch_add(1, Ordering::Relaxed); - } - } - Ok(record.record.clone().downcast::().unwrap()) - } - None => { - // Init returned None, which means there was an error - match error_rx.await { - Ok(e) => Err(e), - Err(_) => Err(crate::Error::internal( - "Failed to receive error from cache init function".to_string(), - )), - } - } - } - } else { - log::warn!("WeakLanceCache: cache no longer available, computing without caching"); - f().await.map(Arc::new) - } - } - - /// Get or insert an item with a cache key type - pub async fn get_or_insert_with_key( - &self, - cache_key: K, - loader: F, - ) -> Result> - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - F: FnOnce() -> Fut, - Fut: Future> + Send, - { - let key_str = cache_key.key().into_owned(); - self.get_or_insert(&key_str, loader).await - } - - /// Insert with a cache key type - /// Returns true if the item was inserted, false if the cache is no longer available - pub async fn insert_with_key(&self, cache_key: &K, value: Arc) -> bool - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - let key_str = cache_key.key().into_owned(); - self.insert(&key_str, value).await - } - - /// Get with a cache key type - pub async fn get_with_key(&self, cache_key: &K) -> Option> - where - K: CacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - let key_str = cache_key.key().into_owned(); - self.get(&key_str).await - } - - /// Get unsized item from cache - pub async fn get_unsized( - &self, - key: &str, - ) -> Option> { - // For unsized types, we store Arc directly - let cache = self.inner.upgrade()?; - let key = self.get_key(key); - if let Some(metadata) = cache.get(&(key, TypeId::of::>())).await { - metadata - .record - .clone() - .downcast::>() - .ok() - .map(|arc| arc.as_ref().clone()) - } else { - None - } - } - - /// Insert unsized item into cache - pub async fn insert_unsized( - &self, - key: &str, - value: Arc, - ) { - if let Some(cache) = self.inner.upgrade() { - let key = self.get_key(key); - let record = SizedRecord::new(Arc::new(value)); - cache.insert((key, TypeId::of::>()), record).await; - } else { - log::warn!("WeakLanceCache: cache no longer available, unable to insert unsized item"); - } - } - - /// Get unsized with a cache key type - pub async fn get_unsized_with_key(&self, cache_key: &K) -> Option> - where - K: UnsizedCacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - let key_str = cache_key.key(); - self.get_unsized(&key_str).await - } - - /// Insert unsized with a cache key type - pub async fn insert_unsized_with_key(&self, cache_key: &K, value: Arc) - where - K: UnsizedCacheKey, - K::ValueType: DeepSizeOf + Send + Sync + 'static, - { - let key_str = cache_key.key(); - self.insert_unsized(&key_str, value).await - } -} - -pub trait CacheKey { - type ValueType; - - fn key(&self) -> Cow<'_, str>; -} - -pub trait UnsizedCacheKey { - type ValueType: ?Sized; - - fn key(&self) -> Cow<'_, str>; -} - -#[derive(Debug, Clone)] -pub struct CacheStats { - /// Number of times `get`, `get_unsized`, or `get_or_insert` found an item in the cache. - pub hits: u64, - /// Number of times `get`, `get_unsized`, or `get_or_insert` did not find an item in the cache. - pub misses: u64, - /// Number of entries currently in the cache. - pub num_entries: usize, - /// Total size in bytes of all entries in the cache. - pub size_bytes: usize, -} - -impl CacheStats { - pub fn hit_ratio(&self) -> f32 { - if self.hits + self.misses == 0 { - 0.0 - } else { - self.hits as f32 / (self.hits + self.misses) as f32 - } - } - - pub fn miss_ratio(&self) -> f32 { - if self.hits + self.misses == 0 { - 0.0 - } else { - self.misses as f32 / (self.hits + self.misses) as f32 - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_cache_bytes() { - let item = Arc::new(vec![1, 2, 3]); - let item_size = item.deep_size_of(); // Size of Arc> - let capacity = 10 * item_size; - - let cache = LanceCache::with_capacity(capacity); - assert_eq!(cache.size_bytes().await, 0); - assert_eq!(cache.approx_size_bytes(), 0); - - let item = Arc::new(vec![1, 2, 3]); - cache.insert("key", item.clone()).await; - assert_eq!(cache.size().await, 1); - assert_eq!(cache.size_bytes().await, item_size); - assert_eq!(cache.approx_size_bytes(), item_size); - - let retrieved = cache.get::>("key").await.unwrap(); - assert_eq!(*retrieved, *item); - - // Test eviction based on size - for i in 0..20 { - cache - .insert(&format!("key_{}", i), Arc::new(vec![i, i, i])) - .await; - } - assert_eq!(cache.size_bytes().await, capacity); - assert_eq!(cache.size().await, 10); - } - - #[tokio::test] - async fn test_cache_trait_objects() { - #[derive(Debug, DeepSizeOf)] - struct MyType(i32); - - trait MyTrait: DeepSizeOf + Send + Sync + Any { - fn as_any(&self) -> &dyn Any; - } - - impl MyTrait for MyType { - fn as_any(&self) -> &dyn Any { - self - } - } - - let item = Arc::new(MyType(42)); - let item_dyn: Arc = item; - - let cache = LanceCache::with_capacity(1000); - cache.insert_unsized("test", item_dyn).await; - - let retrieved = cache.get_unsized::("test").await.unwrap(); - let retrieved = retrieved.as_any().downcast_ref::().unwrap(); - assert_eq!(retrieved.0, 42); - } - - #[tokio::test] - async fn test_cache_stats_basic() { - let cache = LanceCache::with_capacity(1000); - - // Initially no hits or misses - let stats = cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 0); - - // Miss on first get - let result = cache.get::>("nonexistent"); - assert!(result.await.is_none()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 1); - - // Insert and then hit - cache.insert("key1", Arc::new(vec![1, 2, 3])).await; - let result = cache.get::>("key1"); - assert!(result.await.is_some()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 1); - - // Another hit - let result = cache.get::>("key1"); - assert!(result.await.is_some()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 2); - assert_eq!(stats.misses, 1); - - // Another miss - let result = cache.get::>("nonexistent2"); - assert!(result.await.is_none()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 2); - assert_eq!(stats.misses, 2); - } - - #[tokio::test] - async fn test_cache_stats_with_prefixes() { - let base_cache = LanceCache::with_capacity(1000); - let prefixed_cache = base_cache.with_key_prefix("test"); - - // Stats should be shared between base and prefixed cache - let stats = base_cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 0); - - let stats = prefixed_cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 0); - - // Miss on prefixed cache - let result = prefixed_cache.get::>("key1"); - assert!(result.await.is_none()); - - // Both should show the miss - let stats = base_cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 1); - - let stats = prefixed_cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 1); - - // Insert through prefixed cache and hit - prefixed_cache.insert("key1", Arc::new(vec![1, 2, 3])).await; - let result = prefixed_cache.get::>("key1"); - assert!(result.await.is_some()); - - // Both should show the hit - let stats = base_cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 1); - - let stats = prefixed_cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 1); - } - - #[tokio::test] - async fn test_cache_stats_unsized() { - #[derive(Debug, DeepSizeOf)] - struct MyType(i32); - - trait MyTrait: DeepSizeOf + Send + Sync + Any {} - - impl MyTrait for MyType {} - - let cache = LanceCache::with_capacity(1000); - - // Miss on unsized get - let result = cache.get_unsized::("test"); - assert!(result.await.is_none()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 1); - - // Insert and hit on unsized - let item = Arc::new(MyType(42)); - let item_dyn: Arc = item; - cache.insert_unsized("test", item_dyn).await; - - let result = cache.get_unsized::("test"); - assert!(result.await.is_some()); - let stats = cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 1); - } - - #[tokio::test] - async fn test_cache_stats_get_or_insert() { - let cache = LanceCache::with_capacity(1000); - - // First call should be a miss and load the value - let result: Arc> = cache - .get_or_insert("key1".to_string(), |_key| async { Ok(vec![1, 2, 3]) }) - .await - .unwrap(); - assert_eq!(*result, vec![1, 2, 3]); - - let stats = cache.stats().await; - assert_eq!(stats.hits, 0); - assert_eq!(stats.misses, 1); - - // Second call should be a hit - let result: Arc> = cache - .get_or_insert("key1".to_string(), |_key| async { - panic!("Should not be called") - }) - .await - .unwrap(); - assert_eq!(*result, vec![1, 2, 3]); - - let stats = cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 1); - - // Different key should be another miss - let result: Arc> = cache - .get_or_insert("key2".to_string(), |_key| async { Ok(vec![4, 5, 6]) }) - .await - .unwrap(); - assert_eq!(*result, vec![4, 5, 6]); - - let stats = cache.stats().await; - assert_eq!(stats.hits, 1); - assert_eq!(stats.misses, 2); - } -} diff --git a/rust/lance-core/src/cache/backend.rs b/rust/lance-core/src/cache/backend.rs new file mode 100644 index 00000000000..54b24944ab0 --- /dev/null +++ b/rust/lance-core/src/cache/backend.rs @@ -0,0 +1,122 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Backend interface for cache implementors. +//! +//! This module defines the trait that custom cache backends must implement, +//! along with the key and entry types they operate on. Most callers should +//! use [`LanceCache`](super::LanceCache) instead of interacting with +//! backends directly. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::Future; + +use crate::Result; + +/// A type-erased cache entry. +pub type CacheEntry = Arc; + +/// Structured cache key passed to [`CacheBackend`] methods. +/// +/// Composed of three parts: +/// - **prefix**: scopes the key to a dataset or index (e.g. `"s3://bucket/dataset/"`) +/// - **key**: identifies the specific entry (e.g. `"42"` for a version number) +/// - **type_name**: distinguishes different value types stored under the same +/// user key (e.g. `"Vec"`) +/// +/// [`LanceCache`](super::LanceCache) constructs these automatically from +/// [`CacheKey`](super::CacheKey) values; backend authors receive them +/// ready-made. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct InternalCacheKey { + prefix: Arc, + key: Arc, + type_name: &'static str, +} + +impl InternalCacheKey { + pub fn new(prefix: Arc, key: Arc, type_name: &'static str) -> Self { + Self { + prefix, + key, + type_name, + } + } + + pub fn prefix(&self) -> &str { + &self.prefix + } + + pub fn key(&self) -> &str { + &self.key + } + + pub fn type_name(&self) -> &'static str { + self.type_name + } + + /// Returns true if this key's prefix starts with the given string. + pub fn starts_with(&self, prefix: &str) -> bool { + self.prefix.starts_with(prefix) + } +} + +/// Low-level pluggable cache backend. +/// +/// Implementations store entries keyed by [`InternalCacheKey`] and return +/// type-erased [`CacheEntry`] values. +/// [`LanceCache`](super::LanceCache) handles key construction and type safety; +/// backend authors only need to implement storage and eviction. +#[async_trait] +pub trait CacheBackend: Send + Sync + std::fmt::Debug { + /// Look up an entry by its key. + async fn get(&self, key: &InternalCacheKey) -> Option; + + /// Store an entry. `size_bytes` is used for eviction accounting. + async fn insert(&self, key: &InternalCacheKey, entry: CacheEntry, size_bytes: usize); + + /// Get an existing entry or compute it from `loader`. + /// + /// Implementations should deduplicate concurrent loads for the same key + /// so the loader runs at most once. + /// + /// Returns `(entry, was_cached)` where `was_cached` is `true` if the entry + /// was already present in the cache (the loader was not invoked). + async fn get_or_insert<'a>( + &self, + key: &InternalCacheKey, + loader: Pin> + Send + 'a>>, + ) -> Result<(CacheEntry, bool)>; + + /// Remove all entries whose prefix starts with the given string. + async fn invalidate_prefix(&self, prefix: &str); + + /// Remove all entries. + async fn clear(&self); + + /// Number of entries currently stored (may flush pending operations). + async fn num_entries(&self) -> usize; + + /// Total weighted size in bytes of all stored entries (may flush pending operations). + async fn size_bytes(&self) -> usize; + + /// Approximate number of entries, callable from synchronous contexts. + /// Backends that cannot provide this cheaply should return 0. + fn approx_num_entries(&self) -> usize { + 0 + } + + /// Approximate weighted size in bytes, callable from synchronous contexts. + /// Used by `DeepSizeOf` to report cache memory usage. + /// Backends that cannot provide this cheaply should return 0. + /// + /// Assumes entries do not share underlying buffers; if they do, the + /// returned total may overcount. + fn approx_size_bytes(&self) -> usize { + 0 + } +} diff --git a/rust/lance-core/src/cache/mod.rs b/rust/lance-core/src/cache/mod.rs new file mode 100644 index 00000000000..43bb233df72 --- /dev/null +++ b/rust/lance-core/src/cache/mod.rs @@ -0,0 +1,811 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Lance cache system. +//! +//! ## For cache users +//! +//! Use [`LanceCache`] (or [`WeakLanceCache`]) to store and retrieve typed +//! values. Define a [`CacheKey`] (or [`UnsizedCacheKey`] for trait objects) to +//! describe what you're caching and its type. +//! +//! ## For backend implementors +//! +//! Implement [`CacheBackend`] to provide a custom storage layer (disk, Redis, +//! etc.). Backends receive [`InternalCacheKey`] keys and type-erased +//! [`CacheEntry`] values — the typed wrapping is handled by [`LanceCache`]. +//! See the [`backend`] module for details. + +pub mod backend; +mod moka; + +pub use backend::{CacheBackend, CacheEntry, InternalCacheKey}; +pub use moka::MokaCacheBackend; + +use std::borrow::Cow; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use futures::{Future, FutureExt}; + +use crate::Result; + +pub use deepsize::{Context, DeepSizeOf}; + +// --------------------------------------------------------------------------- +// CacheKey / UnsizedCacheKey — typed key traits for cache users +// --------------------------------------------------------------------------- + +/// Typed cache key for sized value types. +/// +/// Implement this trait to define a new type of cached entry. [`LanceCache`] +/// uses the key string and type name to construct an [`InternalCacheKey`] +/// for the backend. +/// +/// # Example +/// +/// ```ignore +/// struct MyKey { id: u64 } +/// +/// impl CacheKey for MyKey { +/// type ValueType = MyData; +/// fn key(&self) -> Cow<'_, str> { self.id.to_string().into() } +/// fn type_name() -> &'static str { "MyData" } +/// } +/// ``` +pub trait CacheKey { + type ValueType: 'static; + + fn key(&self) -> Cow<'_, str>; + + /// Short, stable string identifying this value type. + /// + /// Two `CacheKey` impls that store different `ValueType`s **must** return + /// different type names; if they collide, gets will silently return `None` + /// due to failed downcasts. + /// + /// Use a short literal (e.g. `"Vec"`), not + /// `std::any::type_name` — the latter is not guaranteed stable across + /// compiler versions or build configurations. + fn type_name() -> &'static str; +} + +/// Like [`CacheKey`] but for unsized value types (e.g. `dyn Trait`). +/// +/// The cache wraps values in an extra `Arc` layer internally; callers pass +/// and receive `Arc` where `T: ?Sized`. +pub trait UnsizedCacheKey { + type ValueType: 'static + ?Sized; + + fn key(&self) -> Cow<'_, str>; + + /// Short, stable string identifying this value type. + /// See [`CacheKey::type_name`] for requirements. + fn type_name() -> &'static str; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +/// Size of a cached `Arc`, accounting for the Arc overhead (two atomic counters). +fn cache_entry_size(value: &T) -> usize { + value.deep_size_of() + std::mem::size_of::() * 2 +} + +/// Build an [`InternalCacheKey`] from a cache's prefix, a user key string, +/// and a type name. +fn build_key(prefix: &Arc, key: &str, type_name: &'static str) -> InternalCacheKey { + InternalCacheKey::new(prefix.clone(), Arc::from(key), type_name) +} + +// --------------------------------------------------------------------------- +// LanceCache — typed wrapper around dyn CacheBackend +// --------------------------------------------------------------------------- + +/// Typed cache wrapper that handles key construction and type safety. +/// +/// Internally delegates to a [`CacheBackend`]. The default backend is +/// [`MokaCacheBackend`]; pass a custom backend via [`LanceCache::with_backend`]. +#[derive(Clone)] +pub struct LanceCache { + cache: Arc, + prefix: Arc, + hits: Arc, + misses: Arc, +} + +impl std::fmt::Debug for LanceCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LanceCache") + .field("cache", &self.cache) + .finish() + } +} + +impl DeepSizeOf for LanceCache { + fn deep_size_of_children(&self, _: &mut Context) -> usize { + self.cache.approx_size_bytes() + } +} + +impl LanceCache { + pub fn with_capacity(capacity: usize) -> Self { + Self { + cache: Arc::new(MokaCacheBackend::with_capacity(capacity)), + prefix: Arc::from(""), + hits: Arc::new(AtomicU64::new(0)), + misses: Arc::new(AtomicU64::new(0)), + } + } + + /// Create a cache backed by a custom [`CacheBackend`]. + pub fn with_backend(backend: Arc) -> Self { + Self { + cache: backend, + prefix: Arc::from(""), + hits: Arc::new(AtomicU64::new(0)), + misses: Arc::new(AtomicU64::new(0)), + } + } + + pub fn no_cache() -> Self { + Self { + cache: Arc::new(MokaCacheBackend::no_cache()), + prefix: Arc::from(""), + hits: Arc::new(AtomicU64::new(0)), + misses: Arc::new(AtomicU64::new(0)), + } + } + + /// Create a cache with the given backend and an exact prefix string. + /// Unlike `with_key_prefix`, this sets the prefix verbatim (no trailing slash added). + pub fn with_backend_and_prefix(backend: Arc, prefix: String) -> Self { + Self { + cache: backend, + prefix: Arc::from(prefix), + hits: Arc::new(AtomicU64::new(0)), + misses: Arc::new(AtomicU64::new(0)), + } + } + + /// Appends a prefix to the cache key. + pub fn with_key_prefix(&self, prefix: &str) -> Self { + Self { + cache: self.cache.clone(), + prefix: Arc::from(format!("{}{}/", self.prefix, prefix)), + hits: self.hits.clone(), + misses: self.misses.clone(), + } + } + + /// Invalidate all entries whose prefix starts with the given string. + pub async fn invalidate_prefix(&self, prefix: &str) { + let full_prefix = format!("{}{}", self.prefix, prefix); + self.cache.invalidate_prefix(&full_prefix).await; + } + + pub async fn size(&self) -> usize { + self.cache.num_entries().await + } + + pub fn approx_size(&self) -> usize { + self.cache.approx_num_entries() + } + + pub async fn size_bytes(&self) -> usize { + self.cache.size_bytes().await + } + + // -- Sized insert/get (internal, shared by sized and unsized paths) -------- + + async fn insert_with_id( + &self, + key: &str, + type_name: &'static str, + metadata: Arc, + ) { + let size = cache_entry_size(&*metadata); + let cache_key = build_key(&self.prefix, key, type_name); + self.cache.insert(&cache_key, metadata, size).await; + } + + async fn get_with_id( + &self, + key: &str, + type_name: &'static str, + ) -> Option> { + let cache_key = build_key(&self.prefix, key, type_name); + if let Some(entry) = self.cache.get(&cache_key).await { + match entry.downcast::() { + Ok(val) => { + self.hits.fetch_add(1, Ordering::Relaxed); + Some(val) + } + Err(_) => { + // Type mismatch: the backend returned a different concrete + // type than expected (e.g. a disk cache may store + // intermediate state). Treat as a miss. + self.misses.fetch_add(1, Ordering::Relaxed); + None + } + } + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + None + } + } + + // -- Stats / clear -------------------------------------------------------- + + pub async fn stats(&self) -> CacheStats { + CacheStats { + hits: self.hits.load(Ordering::Relaxed), + misses: self.misses.load(Ordering::Relaxed), + num_entries: self.cache.num_entries().await, + size_bytes: self.cache.size_bytes().await, + } + } + + pub async fn clear(&self) { + self.cache.clear().await; + self.hits.store(0, Ordering::Relaxed); + self.misses.store(0, Ordering::Relaxed); + } + + // -- CacheKey-based methods ----------------------------------------------- + + pub async fn insert_with_key(&self, cache_key: &K, metadata: Arc) + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + self.insert_with_id(&cache_key.key(), K::type_name(), metadata) + .boxed() + .await + } + + pub async fn get_with_key(&self, cache_key: &K) -> Option> + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + self.get_with_id::(&cache_key.key(), K::type_name()) + .boxed() + .await + } + + pub async fn get_or_insert_with_key( + &self, + cache_key: K, + loader: F, + ) -> Result> + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + F: FnOnce() -> Fut + Send, + Fut: Future> + Send, + { + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + + let typed_loader = Box::pin(async move { + let value = loader().await?; + let arc = Arc::new(value); + let size = cache_entry_size(&*arc); + Ok((arc as CacheEntry, size)) + }); + + let (entry, was_cached) = self.cache.get_or_insert(&key, typed_loader).await?; + + if was_cached { + self.hits.fetch_add(1, Ordering::Relaxed); + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + } + + Ok(entry.downcast::().unwrap()) + } + + pub async fn insert_unsized_with_key(&self, cache_key: &K, metadata: Arc) + where + K: UnsizedCacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + self.insert_with_id(&cache_key.key(), K::type_name(), Arc::new(metadata)) + .boxed() + .await + } + + pub async fn get_unsized_with_key(&self, cache_key: &K) -> Option> + where + K: UnsizedCacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + let outer = self + .get_with_id::>(&cache_key.key(), K::type_name()) + .boxed() + .await?; + Some(outer.as_ref().clone()) + } +} + +// --------------------------------------------------------------------------- +// WeakLanceCache +// --------------------------------------------------------------------------- + +/// A weak reference to a LanceCache, used by indices to avoid circular references. +/// When the original cache is dropped, operations on this will gracefully no-op. +#[derive(Clone, Debug)] +pub struct WeakLanceCache { + inner: std::sync::Weak, + prefix: Arc, + hits: Arc, + misses: Arc, +} + +impl WeakLanceCache { + pub fn from(cache: &LanceCache) -> Self { + Self { + inner: Arc::downgrade(&cache.cache), + prefix: cache.prefix.clone(), + hits: cache.hits.clone(), + misses: cache.misses.clone(), + } + } + + pub fn with_key_prefix(&self, prefix: &str) -> Self { + Self { + inner: self.inner.clone(), + prefix: Arc::from(format!("{}{}/", self.prefix, prefix)), + hits: self.hits.clone(), + misses: self.misses.clone(), + } + } + + /// The key prefix used for all entries in this cache. + pub fn prefix(&self) -> &str { + &self.prefix + } + + pub async fn get_with_key(&self, cache_key: &K) -> Option> + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + let cache = self.inner.upgrade()?; + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + if let Some(entry) = cache.get(&key).await { + self.hits.fetch_add(1, Ordering::Relaxed); + Some(entry.downcast::().unwrap()) + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + None + } + } + + pub async fn insert_with_key(&self, cache_key: &K, value: Arc) -> bool + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + if let Some(cache) = self.inner.upgrade() { + let size = cache_entry_size(&*value); + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + cache.insert(&key, value, size).await; + true + } else { + log::warn!("WeakLanceCache: cache no longer available, unable to insert item"); + false + } + } + + /// Get or insert an item, computing it if necessary. + /// + /// Deduplication of concurrent loads is handled by the backend. + pub async fn get_or_insert_with_key( + &self, + cache_key: K, + loader: F, + ) -> Result> + where + K: CacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + F: FnOnce() -> Fut + Send, + Fut: Future> + Send, + { + if let Some(cache) = self.inner.upgrade() { + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + let typed_loader = Box::pin(async move { + let value = loader().await?; + let arc = Arc::new(value); + let size = cache_entry_size(&*arc); + Ok((arc as CacheEntry, size)) + }); + let (entry, was_cached) = cache.get_or_insert(&key, typed_loader).await?; + if was_cached { + self.hits.fetch_add(1, Ordering::Relaxed); + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + } + Ok(entry.downcast::().unwrap()) + } else { + log::warn!("WeakLanceCache: cache no longer available, computing without caching"); + loader().await.map(Arc::new) + } + } + + pub async fn get_unsized_with_key(&self, cache_key: &K) -> Option> + where + K: UnsizedCacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + let cache = self.inner.upgrade()?; + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + if let Some(entry) = cache.get(&key).await { + entry + .downcast::>() + .ok() + .map(|arc| arc.as_ref().clone()) + } else { + None + } + } + + pub async fn insert_unsized_with_key(&self, cache_key: &K, value: Arc) + where + K: UnsizedCacheKey, + K::ValueType: DeepSizeOf + Send + Sync + 'static, + { + if let Some(cache) = self.inner.upgrade() { + let wrapper = Arc::new(value); + let size = cache_entry_size(&*wrapper); + let key = build_key(&self.prefix, &cache_key.key(), K::type_name()); + cache.insert(&key, wrapper, size).await; + } else { + log::warn!("WeakLanceCache: cache no longer available, unable to insert unsized item"); + } + } +} + +// --------------------------------------------------------------------------- +// CacheStats +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +pub struct CacheStats { + /// Number of times `get`, `get_unsized`, or `get_or_insert` found an item in the cache. + pub hits: u64, + /// Number of times `get`, `get_unsized`, or `get_or_insert` did not find an item in the cache. + pub misses: u64, + /// Number of entries currently in the cache. + pub num_entries: usize, + /// Total size in bytes of all entries in the cache. + pub size_bytes: usize, +} + +impl CacheStats { + pub fn hit_ratio(&self) -> f32 { + if self.hits + self.misses == 0 { + 0.0 + } else { + self.hits as f32 / (self.hits + self.misses) as f32 + } + } + + pub fn miss_ratio(&self) -> f32 { + if self.hits + self.misses == 0 { + 0.0 + } else { + self.misses as f32 / (self.hits + self.misses) as f32 + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::marker::PhantomData; + + struct TestKey { + key: String, + _phantom: PhantomData, + } + + impl TestKey { + fn new(key: &str) -> Self { + Self { + key: key.to_string(), + _phantom: PhantomData, + } + } + } + + impl CacheKey for TestKey { + type ValueType = T; + fn key(&self) -> std::borrow::Cow<'_, str> { + std::borrow::Cow::Borrowed(&self.key) + } + fn type_name() -> &'static str { + std::any::type_name::() + } + } + + /// Test helper: an UnsizedCacheKey for trait object values. + struct TestUnsizedKey { + key: String, + _phantom: PhantomData, + } + + impl TestUnsizedKey { + fn new(key: &str) -> Self { + Self { + key: key.to_string(), + _phantom: PhantomData, + } + } + } + + impl UnsizedCacheKey for TestUnsizedKey { + type ValueType = T; + fn key(&self) -> std::borrow::Cow<'_, str> { + std::borrow::Cow::Borrowed(&self.key) + } + fn type_name() -> &'static str { + std::any::type_name::() + } + } + + #[tokio::test] + async fn test_cache_bytes() { + let item = Arc::new(vec![1, 2, 3]); + let item_size = item.deep_size_of(); + let capacity = 10 * item_size; + let cache = LanceCache::with_capacity(capacity); + + cache + .insert_with_key(&TestKey::>::new("key"), item.clone()) + .await; + assert_eq!(cache.size().await, 1); + + let retrieved = cache + .get_with_key(&TestKey::>::new("key")) + .await + .unwrap(); + assert_eq!(*retrieved, *item); + + for i in 0..20 { + cache + .insert_with_key( + &TestKey::>::new(&format!("key_{}", i)), + Arc::new(vec![i, i, i]), + ) + .await; + } + assert!(cache.size_bytes().await <= capacity); + } + + #[tokio::test] + async fn test_cache_trait_objects() { + #[derive(Debug, DeepSizeOf)] + struct MyType(i32); + + trait MyTrait: DeepSizeOf + Send + Sync + std::any::Any { + fn as_any(&self) -> &dyn std::any::Any; + } + + impl MyTrait for MyType { + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + let item: Arc = Arc::new(MyType(42)); + let cache = LanceCache::with_capacity(1000); + cache + .insert_unsized_with_key(&TestUnsizedKey::::new("test"), item) + .await; + + let retrieved = cache + .get_unsized_with_key(&TestUnsizedKey::::new("test")) + .await + .unwrap(); + assert_eq!(retrieved.as_any().downcast_ref::().unwrap().0, 42); + } + + #[tokio::test] + async fn test_cache_stats_basic() { + let cache = LanceCache::with_capacity(1000); + assert_eq!(cache.stats().await.hits, 0); + + // Miss + assert!( + cache + .get_with_key(&TestKey::>::new("x")) + .await + .is_none() + ); + assert_eq!(cache.stats().await.misses, 1); + + // Insert then hit + cache + .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3])) + .await; + assert!( + cache + .get_with_key(&TestKey::>::new("k")) + .await + .is_some() + ); + assert_eq!(cache.stats().await.hits, 1); + } + + #[tokio::test] + async fn test_cache_stats_with_prefixes() { + let base = LanceCache::with_capacity(1000); + let prefixed = base.with_key_prefix("ns"); + + assert!( + prefixed + .get_with_key(&TestKey::>::new("k")) + .await + .is_none() + ); + assert_eq!(base.stats().await.misses, 1); + + prefixed + .insert_with_key(&TestKey::new("k"), Arc::new(vec![1])) + .await; + assert!( + prefixed + .get_with_key(&TestKey::>::new("k")) + .await + .is_some() + ); + assert_eq!(base.stats().await.hits, 1); + } + + #[tokio::test] + async fn test_cache_get_or_insert() { + let cache = LanceCache::with_capacity(1000); + + let v: Arc> = cache + .get_or_insert_with_key(TestKey::>::new("k"), || async { + Ok(vec![1, 2, 3]) + }) + .await + .unwrap(); + assert_eq!(*v, vec![1, 2, 3]); + assert_eq!(cache.stats().await.misses, 1); + assert_eq!(cache.stats().await.hits, 0); + + // Second call should not invoke loader and should be a hit + let v: Arc> = cache + .get_or_insert_with_key(TestKey::>::new("k"), || async { + panic!("should not be called") + }) + .await + .unwrap(); + assert_eq!(*v, vec![1, 2, 3]); + assert_eq!(cache.stats().await.hits, 1); + } + + #[tokio::test] + async fn test_custom_backend() { + use async_trait::async_trait; + use tokio::sync::Mutex; + + #[derive(Debug)] + struct HashMapBackend { + map: Mutex>, + } + + impl HashMapBackend { + fn new() -> Self { + Self { + map: Mutex::new(HashMap::new()), + } + } + } + + #[async_trait] + impl CacheBackend for HashMapBackend { + async fn get(&self, key: &InternalCacheKey) -> Option { + self.map.lock().await.get(key).map(|(e, _)| e.clone()) + } + async fn insert(&self, key: &InternalCacheKey, entry: CacheEntry, size_bytes: usize) { + self.map + .lock() + .await + .insert(key.clone(), (entry, size_bytes)); + } + async fn get_or_insert<'a>( + &self, + key: &InternalCacheKey, + loader: std::pin::Pin< + Box> + Send + 'a>, + >, + ) -> Result<(CacheEntry, bool)> { + if let Some((entry, _)) = self.map.lock().await.get(key) { + Ok((entry.clone(), true)) + } else { + let (entry, size) = loader.await?; + self.map + .lock() + .await + .insert(key.clone(), (entry.clone(), size)); + Ok((entry, false)) + } + } + async fn invalidate_prefix(&self, prefix: &str) { + self.map.lock().await.retain(|k, _| !k.starts_with(prefix)); + } + async fn clear(&self) { + self.map.lock().await.clear(); + } + async fn num_entries(&self) -> usize { + self.map.lock().await.len() + } + async fn size_bytes(&self) -> usize { + self.map.lock().await.values().map(|(_, s)| *s).sum() + } + } + + let cache = LanceCache::with_backend(Arc::new(HashMapBackend::new())); + + cache + .insert_with_key(&TestKey::new("k"), Arc::new(vec![1, 2, 3])) + .await; + assert!( + cache + .get_with_key(&TestKey::>::new("k")) + .await + .is_some() + ); + // Different type at same key = miss + assert!( + cache + .get_with_key(&TestKey::>::new("k")) + .await + .is_none() + ); + } + + #[tokio::test] + async fn test_get_or_insert_dedup() { + use std::sync::atomic::AtomicUsize; + + let load_count = Arc::new(AtomicUsize::new(0)); + let cache = LanceCache::with_capacity(10000); + + let (barrier_tx, _) = tokio::sync::broadcast::channel::<()>(1); + let mut handles = Vec::new(); + for _ in 0..5 { + let cache = cache.clone(); + let load_count = load_count.clone(); + let mut barrier_rx = barrier_tx.subscribe(); + handles.push(tokio::spawn(async move { + barrier_rx.recv().await.ok(); + cache + .get_or_insert_with_key(TestKey::>::new("key"), || { + let load_count = load_count.clone(); + async move { + load_count.fetch_add(1, Ordering::SeqCst); + tokio::task::yield_now().await; + Ok(vec![1, 2, 3]) + } + }) + .await + })); + } + barrier_tx.send(()).unwrap(); + for h in handles { + let result: Arc> = h.await.unwrap().unwrap(); + assert_eq!(*result, vec![1, 2, 3]); + } + + assert_eq!(load_count.load(Ordering::SeqCst), 1); + } +} diff --git a/rust/lance-core/src/cache/moka.rs b/rust/lance-core/src/cache/moka.rs new file mode 100644 index 00000000000..05cb1e5909f --- /dev/null +++ b/rust/lance-core/src/cache/moka.rs @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use async_trait::async_trait; +use futures::Future; + +use crate::Result; + +use super::backend::{CacheBackend, CacheEntry, InternalCacheKey}; + +/// Internal record stored in the moka cache. +#[derive(Clone, Debug)] +struct MokaCacheEntry { + entry: CacheEntry, + size_bytes: usize, +} + +/// Default [`CacheBackend`] backed by a [moka](https://crates.io/crates/moka) cache. +/// +/// Provides weighted-capacity eviction and concurrent-load deduplication +/// via moka's built-in `optionally_get_with`. +pub struct MokaCacheBackend { + cache: moka::future::Cache, +} + +impl std::fmt::Debug for MokaCacheBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MokaCacheBackend") + .field("entry_count", &self.cache.entry_count()) + .finish() + } +} + +impl MokaCacheBackend { + pub fn with_capacity(capacity: usize) -> Self { + let cache = moka::future::Cache::builder() + .max_capacity(capacity as u64) + .weigher(|_, v: &MokaCacheEntry| v.size_bytes.try_into().unwrap_or(u32::MAX)) + .support_invalidation_closures() + .build(); + Self { cache } + } + + pub fn no_cache() -> Self { + Self { + cache: moka::future::Cache::new(0), + } + } +} + +#[async_trait] +impl CacheBackend for MokaCacheBackend { + async fn get(&self, key: &InternalCacheKey) -> Option { + self.cache.get(key).await.map(|r| r.entry) + } + + async fn insert(&self, key: &InternalCacheKey, entry: CacheEntry, size_bytes: usize) { + self.cache + .insert(key.clone(), MokaCacheEntry { entry, size_bytes }) + .await; + } + + async fn get_or_insert<'a>( + &self, + key: &InternalCacheKey, + loader: Pin> + Send + 'a>>, + ) -> Result<(CacheEntry, bool)> { + // Use moka's built-in dedup: optionally_get_with runs the init future + // at most once per key, even under concurrent access. + let (error_tx, error_rx) = tokio::sync::oneshot::channel(); + + // Track whether the loader actually ran (= cache miss). + let was_miss = Arc::new(AtomicBool::new(false)); + let was_miss_clone = was_miss.clone(); + + let init = async move { + was_miss_clone.store(true, Ordering::Relaxed); + match loader.await { + Ok((entry, size_bytes)) => Some(MokaCacheEntry { entry, size_bytes }), + Err(e) => { + let _ = error_tx.send(e); + None + } + } + }; + + let owned_key = key.clone(); + match self.cache.optionally_get_with(owned_key, init).await { + Some(record) => { + let was_cached = !was_miss.load(Ordering::Relaxed); + Ok((record.entry, was_cached)) + } + None => match error_rx.await { + Ok(err) => Err(err), + Err(_) => Err(crate::Error::internal( + "Failed to retrieve error from cache loader", + )), + }, + } + } + + async fn invalidate_prefix(&self, prefix: &str) { + let prefix = prefix.to_owned(); + self.cache + .invalidate_entries_if(move |key, _value| key.starts_with(&prefix)) + .expect("Cache configured correctly"); + } + + async fn clear(&self) { + self.cache.invalidate_all(); + self.cache.run_pending_tasks().await; + } + + async fn num_entries(&self) -> usize { + self.cache.run_pending_tasks().await; + self.cache.entry_count() as usize + } + + async fn size_bytes(&self) -> usize { + self.cache.run_pending_tasks().await; + self.cache.weighted_size() as usize + } + + fn approx_num_entries(&self) -> usize { + self.cache.entry_count() as usize + } + + fn approx_size_bytes(&self) -> usize { + // Iterate rather than using `weighted_size()` because moka's + // weighted_size can be stale without `run_pending_tasks()`, which + // is async and can't be called from this synchronous context. + self.cache.iter().map(|(_, v)| v.size_bytes).sum() + } +} diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 0e3db7e9a54..f4f8b6e8627 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -3416,6 +3416,10 @@ impl CacheKey for FieldDataCacheKey { fn key(&self) -> std::borrow::Cow<'_, str> { self.column_index.to_string().into() } + + fn type_name() -> &'static str { + "FieldData" + } } impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler { diff --git a/rust/lance-file/src/previous/reader.rs b/rust/lance-file/src/previous/reader.rs index 863aca1afc6..9e1fc175d04 100644 --- a/rust/lance-file/src/previous/reader.rs +++ b/rust/lance-file/src/previous/reader.rs @@ -83,12 +83,19 @@ impl<'a, T> StringCacheKey<'a, T> { } } -impl CacheKey for StringCacheKey<'_, T> { +impl CacheKey for StringCacheKey<'_, T> { type ValueType = T; fn key(&self) -> Cow<'_, str> { self.key.into() } + + fn type_name() -> &'static str { + // This is a private, crate-internal key that is only instantiated with + // a single concrete T within one build, so std::any::type_name is fine + // here — there is no cross-crate collision risk. + std::any::type_name::() + } } impl FileReader { @@ -238,7 +245,7 @@ impl FileReader { loader: F, ) -> Result> where - F: Fn(&str) -> Fut, + F: Fn(&str) -> Fut + Send + Sync, Fut: Future> + Send, { if let Some(cache) = cache { diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 2f0b1c03760..05405344efd 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -128,6 +128,10 @@ impl CacheKey for BitmapKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("{}", self.value.0).into() } + + fn type_name() -> &'static str { + "Bitmap" + } } impl BitmapIndex { diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 17c180dbf4a..3fa05dfd131 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -990,6 +990,10 @@ impl CacheKey for BTreePageKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("page-{}", self.page_number).into() } + + fn type_name() -> &'static str { + "BTreePage" + } } /// Note: this is very similar to the IVF index except we store the IVF part in a btree diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index b273ef4b740..88cecddd697 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1888,6 +1888,10 @@ impl CacheKey for PostingListKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("postings-{}", self.token_id).into() } + + fn type_name() -> &'static str { + "PostingList" + } } #[derive(Debug, Clone)] @@ -1901,6 +1905,10 @@ impl CacheKey for PositionKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("positions-{}", self.token_id).into() } + + fn type_name() -> &'static str { + "Position" + } } #[derive(Debug, Clone, PartialEq)] diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 486efe6e034..4e614d99d99 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -170,6 +170,10 @@ impl CacheKey for NGramPostingListKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("posting-list-{}", self.row_offset).into() } + + fn type_name() -> &'static str { + "NGramPostingList" + } } impl NGramPostingList { diff --git a/rust/lance-index/src/scalar/rtree.rs b/rust/lance-index/src/scalar/rtree.rs index 9600c94823b..920a59bb4b2 100644 --- a/rust/lance-index/src/scalar/rtree.rs +++ b/rust/lance-index/src/scalar/rtree.rs @@ -249,6 +249,10 @@ impl CacheKey for RTreeCacheKey { Self::Nulls => "nulls".into(), } } + + fn type_name() -> &'static str { + "RTree" + } } #[derive(Clone)] diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 36ec0e18e66..ced83badaf3 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::{collections::HashMap, sync::Arc, time::Duration}; +use lance_core::cache::CacheBackend; + use super::refs::{Ref, Refs}; use super::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE, ReadParams, WriteParams}; use crate::dataset::branch_location::BranchLocation; @@ -37,6 +39,8 @@ pub struct DatasetBuilder { /// Metadata cache size for the fragment metadata. If it is zero, metadata /// cache is disabled. metadata_cache_size_bytes: usize, + /// Custom index cache backend. If set, overrides `index_cache_size_bytes`. + index_cache_backend: Option>, /// Optional pre-loaded manifest to avoid loading it again. manifest: Option, session: Option>, @@ -73,6 +77,7 @@ impl DatasetBuilder { Self { index_cache_size_bytes: DEFAULT_INDEX_CACHE_SIZE, metadata_cache_size_bytes: DEFAULT_METADATA_CACHE_SIZE, + index_cache_backend: None, table_uri: table_uri.as_ref().to_string(), options: ObjectStoreParams::default(), commit_handler: None, @@ -177,6 +182,15 @@ impl DatasetBuilder { self } + /// Use a custom index cache backend. + /// + /// When set, this overrides `with_index_cache_size_bytes` — the custom + /// backend is responsible for its own capacity management. + pub fn with_index_cache_backend(mut self, backend: Arc) -> Self { + self.index_cache_backend = Some(backend); + self + } + /// Set the cache size for indices. Set to zero, to disable the cache. #[deprecated(since = "0.30.0", note = "Use `with_index_cache_size_bytes` instead")] pub fn with_index_cache_size(mut self, cache_size: usize) -> Self { @@ -576,13 +590,21 @@ impl DatasetBuilder { } } + let index_cache_backend = self.index_cache_backend.take(); let session = match self.session.as_ref() { Some(session) => session.clone(), - None => Arc::new(Session::new( - self.index_cache_size_bytes, - self.metadata_cache_size_bytes, - Default::default(), - )), + None => match index_cache_backend { + Some(backend) => Arc::new(Session::with_index_cache_backend( + backend, + self.metadata_cache_size_bytes, + Default::default(), + )), + None => Arc::new(Session::new( + self.index_cache_size_bytes, + self.metadata_cache_size_bytes, + Default::default(), + )), + }, }; let target_ref = self.version.clone(); diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 5be98a9b23d..986fe8a8443 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1879,6 +1879,10 @@ impl CacheKey for FileMetadataCacheKey { fn key(&self) -> std::borrow::Cow<'_, str> { "".into() } + + fn type_name() -> &'static str { + "FileMetadata" + } } impl From for Fragment { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index fdc91760a2e..a1f41ead087 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -110,6 +110,10 @@ impl UnsizedCacheKey for ScalarIndexCacheKey<'_> { self.uuid.into() } } + + fn type_name() -> &'static str { + "ScalarIndex" + } } #[derive(Debug, Clone)] @@ -134,6 +138,10 @@ impl UnsizedCacheKey for VectorIndexCacheKey<'_> { self.uuid.into() } } + + fn type_name() -> &'static str { + "VectorIndex" + } } #[derive(Debug, Clone)] @@ -158,6 +166,10 @@ impl CacheKey for FragReuseIndexCacheKey<'_> { self.uuid.into() } } + + fn type_name() -> &'static str { + "FragReuseIndex" + } } #[derive(Debug, Clone)] @@ -182,6 +194,10 @@ impl CacheKey for MemWalCacheKey<'_> { self.uuid.to_string().into() } } + + fn type_name() -> &'static str { + "MemWalIndex" + } } // Whether to auto-migrate a dataset when we encounter corruption. diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 621773c60c0..cc243eac887 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -123,6 +123,10 @@ impl UnsizedCacheKey for LegacyIVFPartitionKey { fn key(&self) -> std::borrow::Cow<'_, str> { format!("ivf-{}", self.partition_id).into() } + + fn type_name() -> &'static str { + "LegacyIVFPartition" + } } /// IVF Index. diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0bdc0389648..5da12b687a7 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -96,6 +96,12 @@ impl CacheKey for IVFPartit fn key(&self) -> std::borrow::Cow<'_, str> { format!("ivf-{}", self.partition_id).into() } + + fn type_name() -> &'static str { + // Using type_name is safe here: the impl is in the same crate as the + // types, so the monomorphized pointer is consistent. + std::any::type_name::>() + } } /// IVF Index. diff --git a/rust/lance/src/lib.rs b/rust/lance/src/lib.rs index 934be0e519c..b9c8c7a4f5e 100644 --- a/rust/lance/src/lib.rs +++ b/rust/lance/src/lib.rs @@ -72,7 +72,7 @@ use arrow_schema::DataType; use dataset::builder::DatasetBuilder; pub use lance_core::datatypes; -pub use lance_core::{Error, Result}; +pub use lance_core::{Error, Result, cache}; use std::sync::LazyLock; pub mod arrow; diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index c67345fba32..b032cbaa15e 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use deepsize::DeepSizeOf; -use lance_core::cache::LanceCache; +use lance_core::cache::{CacheBackend, LanceCache}; use lance_core::{Error, Result}; use lance_index::IndexType; use lance_io::object_store::ObjectStoreRegistry; @@ -17,7 +17,7 @@ use crate::session::index_caches::GlobalIndexCache; use self::index_extension::IndexExtension; pub(crate) mod caches; -pub(crate) mod index_caches; +pub mod index_caches; pub(crate) mod index_extension; /// A user session holds the runtime state for a [`crate::Dataset`] @@ -77,11 +77,7 @@ impl std::fmt::Debug for Session { ) .field( "file_metadata_cache", - &format!( - "LanceCache(items={}, size_bytes={})", - self.metadata_cache.0.approx_size(), - self.metadata_cache.0.approx_size_bytes(), - ), + &format!("LanceCache(items={})", self.metadata_cache.0.approx_size(),), ) .field( "index_extensions", @@ -114,6 +110,23 @@ impl Session { } } + /// Create a session with a custom index cache backend. + /// + /// The provided backend will be used for caching index data. The metadata + /// cache will use the default Moka-based backend with the given capacity. + pub fn with_index_cache_backend( + index_cache_backend: Arc, + metadata_cache_size: usize, + store_registry: Arc, + ) -> Self { + Self { + index_cache: GlobalIndexCache(LanceCache::with_backend(index_cache_backend)), + metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)), + index_extensions: HashMap::new(), + store_registry, + } + } + /// Register a new index extension. /// /// A name can only be registered once per type of index extension. @@ -182,6 +195,11 @@ impl Session { self.store_registry.clone() } + /// Get a reference to the raw metadata cache (for use in index reconstruction). + pub fn file_metadata_cache(&self) -> &LanceCache { + &self.metadata_cache.0 + } + /// Fetch statistics for the metadata cache pub async fn metadata_cache_stats(&self) -> lance_core::cache::CacheStats { self.metadata_cache.0.stats().await @@ -206,7 +224,21 @@ impl Default for Session { #[cfg(test)] mod tests { use super::*; + use lance_core::cache::UnsizedCacheKey; use lance_index::vector::VectorIndex; + use std::borrow::Cow; + + struct TestUnsizedKey(&'static str); + impl UnsizedCacheKey for TestUnsizedKey { + type ValueType = dyn VectorIndex; + fn key(&self) -> Cow<'_, str> { + Cow::Borrowed(self.0) + } + + fn type_name() -> &'static str { + "TestUnsized" + } + } #[tokio::test] async fn test_disable_index_cache() { @@ -214,7 +246,7 @@ mod tests { assert!( no_cache .index_cache - .get_unsized::("abc") + .get_unsized_with_key(&TestUnsizedKey("abc")) .await .is_none() ); diff --git a/rust/lance/src/session/caches.rs b/rust/lance/src/session/caches.rs index 67c684c98de..55f78a5068f 100644 --- a/rust/lance/src/session/caches.rs +++ b/rust/lance/src/session/caches.rs @@ -75,7 +75,6 @@ pub struct ManifestKey<'a> { impl CacheKey for ManifestKey<'_> { type ValueType = Manifest; - fn key(&self) -> Cow<'_, str> { if let Some(e_tag) = self.e_tag { Cow::Owned(format!("manifest/{}/{}", self.version, e_tag)) @@ -83,6 +82,9 @@ impl CacheKey for ManifestKey<'_> { Cow::Owned(format!("manifest/{}", self.version)) } } + fn type_name() -> &'static str { + "Manifest" + } } #[derive(Debug)] @@ -92,10 +94,12 @@ pub struct TransactionKey { impl CacheKey for TransactionKey { type ValueType = Transaction; - fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("txn/{}", self.version)) } + fn type_name() -> &'static str { + "Transaction" + } } #[derive(Debug)] @@ -106,7 +110,6 @@ pub struct DeletionFileKey<'a> { impl CacheKey for DeletionFileKey<'_> { type ValueType = DeletionVector; - fn key(&self) -> Cow<'_, str> { Cow::Owned(format!( "deletion/{}/{}/{}/{}", @@ -116,6 +119,9 @@ impl CacheKey for DeletionFileKey<'_> { self.deletion_file.file_type.suffix() )) } + fn type_name() -> &'static str { + "DeletionVector" + } } #[derive(Debug)] @@ -125,10 +131,12 @@ pub struct RowAddrMaskKey { impl CacheKey for RowAddrMaskKey { type ValueType = RowAddrMask; - fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("row_addr_mask/{}", self.version)) } + fn type_name() -> &'static str { + "RowAddrMask" + } } #[derive(Debug)] @@ -138,10 +146,12 @@ pub struct RowIdIndexKey { impl CacheKey for RowIdIndexKey { type ValueType = RowIdIndex; - fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("row_id_index/{}", self.version)) } + fn type_name() -> &'static str { + "RowIdIndex" + } } #[derive(Debug)] @@ -151,10 +161,12 @@ pub struct RowIdSequenceKey { impl CacheKey for RowIdSequenceKey { type ValueType = RowIdSequence; - fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("row_id_sequence/{}", self.fragment_id)) } + fn type_name() -> &'static str { + "RowIdSequence" + } } impl DSMetadataCache { diff --git a/rust/lance/src/session/index_caches.rs b/rust/lance/src/session/index_caches.rs index d9578d43112..43443b5dd34 100644 --- a/rust/lance/src/session/index_caches.rs +++ b/rust/lance/src/session/index_caches.rs @@ -88,6 +88,10 @@ impl CacheKey for FragReuseIndexKey<'_> { fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("frag_reuse/{}", self.uuid)) } + + fn type_name() -> &'static str { + "FragReuseIndex" + } } #[derive(Debug)] @@ -101,6 +105,10 @@ impl CacheKey for IndexMetadataKey { fn key(&self) -> Cow<'_, str> { Cow::Owned(self.version.to_string()) } + + fn type_name() -> &'static str { + "Vec" + } } pub struct ProstAny(pub Arc); @@ -128,4 +136,8 @@ impl CacheKey for ScalarIndexDetailsKey<'_> { fn key(&self) -> Cow<'_, str> { Cow::Owned(format!("type/{}", self.uuid)) } + + fn type_name() -> &'static str { + "ScalarIndexDetails" + } }