From 9bb9111ab0c726773c39d15db7f53c06d2182b07 Mon Sep 17 00:00:00 2001 From: Sesh Nalla Date: Sun, 19 Apr 2026 13:48:59 -0400 Subject: [PATCH] feat(executor): REDIS_MAXMEMORY_MB sweeper for memory-mode tenants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the OOM crashloops in single-pod REDIS_STORE_TYPE=memory tenants (redis-cache-v2, redis-cache-edge, redis-session-store-v2/edge, redis-analytics-v2/edge) that accumulated unbounded until the Kubernetes memory limit killed them. The existing ServerConfig exposed maxmemory as a CONFIG GET/SET knob but never enforced it — writes just pushed into the HashMap forever. This ships an optional, additive periodic sweeper: - `REDIS_MAXMEMORY_MB` env var (default unset → feature disabled, no behavior change). Malformed values are logged and treated as disabled. - `REDIS_MAXMEMORY_CHECK_SECS` env var (default 5 s). - When approximate process-wide usage exceeds the cap, the sweeper evicts random keys across all shards down to 90% of cap (headroom so we don't oscillate at the boundary) and logs a structured WARN with before/after byte counts for observability. - Byte estimates come from new `approx_bytes()` methods on each data type (SDS, List, Set, Hash, SortedSet) that sum payload + struct overhead — tuned for eviction decisions, not a precise INFO memory. - Expired keys are swept first (for free) so TTL'd keys don't waste eviction budget on live ones. The public entrypoint lives in a new `src/redis/executor/memory_ops.rs` module (additive; existing executor paths untouched) and is exposed at the shard boundary via two new messages on both `ShardMessage` and `ReplicatedShardMessage`. Both paths dispatch to the same `CommandExecutor::{approx_used_bytes, evict_random_until_below}` helpers. No existing write-path changes → zero regression risk on the hot path. The feature is opt-in via env var, so deploying this image with the env var unset is a no-op. Follow-up work (separate PR) to add proper Redis-compatible maxmemory-policy semantics (allkeys-lru, volatile-ttl, etc.) and incremental byte tracking so sweeps are O(1) instead of O(N). Test plan: - cargo test --lib redis::executor::memory_ops:: --release (5 pass) - cargo test --lib --release (531 pass, no regressions) - Deploy to libstream-dev with REDIS_MAXMEMORY_MB=800 on affected pods; verify restart counts stop climbing --- src/bin/server_persistent.rs | 70 ++++++ src/production/replicated_shard_actor.rs | 56 +++++ src/production/replicated_state.rs | 26 +++ src/production/sharded_actor.rs | 80 +++++++ src/redis/data/hash.rs | 15 ++ src/redis/data/list.rs | 9 + src/redis/data/sds.rs | 16 ++ src/redis/data/set.rs | 12 + src/redis/data/sorted_set.rs | 14 ++ src/redis/data/value.rs | 20 ++ src/redis/executor/memory_ops.rs | 278 +++++++++++++++++++++++ src/redis/executor/mod.rs | 1 + 12 files changed, 597 insertions(+) create mode 100644 src/redis/executor/memory_ops.rs diff --git a/src/bin/server_persistent.rs b/src/bin/server_persistent.rs index 0035fb7..a43a447 100644 --- a/src/bin/server_persistent.rs +++ b/src/bin/server_persistent.rs @@ -339,6 +339,28 @@ impl ClusterConfig { /// - Applies received deltas via CRDT merge (idempotent) /// /// Message framing: [4 bytes big-endian length][JSON payload] +/// Parse the `REDIS_MAXMEMORY_MB` env var into a byte cap. Returns `None` +/// when the var is unset or parses to zero (the "disabled" sentinel, +/// matching Redis's own `maxmemory 0` default meaning "unlimited"). +/// +/// Surfaces a WARN on malformed values so operators learn fast if they +/// typo the env var, rather than getting silent "feature not active." +fn parse_maxmemory_env() -> Option { + let raw = std::env::var("REDIS_MAXMEMORY_MB").ok()?; + match raw.trim().parse::() { + Ok(0) => None, + Ok(mb) => mb.checked_mul(1024).and_then(|kb| kb.checked_mul(1024)), + Err(e) => { + warn!( + raw = %raw, + error = %e, + "Ignoring malformed REDIS_MAXMEMORY_MB; memory sweeper disabled" + ); + None + } + } +} + async fn start_gossip_listener( port: u16, state: Arc, @@ -782,6 +804,54 @@ async fn main() -> Result<(), Box> { info!("Gossip replication started"); } + // ── Memory sweeper (optional) ──────────────────────────────────────── + // + // When `REDIS_MAXMEMORY_MB` is set, spawn a background task that + // periodically checks process-wide approximate usage and evicts random + // keys when over the cap. Purpose: prevent memory-mode single-pod + // tenants from OOMing at the container limit (see + // `dd/issue-memory-tenant-eviction.md`). + // + // Default (env var unset or 0): no sweeper, no behavior change. + if let Some(maxmem_bytes) = parse_maxmemory_env() { + let interval_secs = std::env::var("REDIS_MAXMEMORY_CHECK_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(5); + let target_bytes = maxmem_bytes.saturating_mul(9).saturating_div(10); + info!( + "Memory sweeper enabled: max={} MB, target={} MB (90%), interval={}s", + maxmem_bytes / (1024 * 1024), + target_bytes / (1024 * 1024), + interval_secs + ); + + let sweeper_state = state.clone(); + tokio::spawn(async move { + let mut ticker = + tokio::time::interval(std::time::Duration::from_secs(interval_secs)); + // Skip the immediate first tick so startup isn't slowed. + ticker.tick().await; + loop { + ticker.tick().await; + let used = sweeper_state.approx_used_bytes_all_shards().await; + if used > maxmem_bytes { + let evicted = + sweeper_state.evict_memory_all_shards(target_bytes).await; + let post = sweeper_state.approx_used_bytes_all_shards().await; + warn!( + used_bytes = used, + max_bytes = maxmem_bytes, + target_bytes, + evicted, + post_eviction_bytes = post, + "Memory sweeper evicted keys to stay under maxmemory cap" + ); + } + } + }); + } + println!("Starting server..."); println!(); diff --git a/src/production/replicated_shard_actor.rs b/src/production/replicated_shard_actor.rs index 833319d..e5109d7 100644 --- a/src/production/replicated_shard_actor.rs +++ b/src/production/replicated_shard_actor.rs @@ -41,6 +41,17 @@ pub enum ReplicatedShardMessage { current_time: VirtualTime, response: oneshot::Sender, }, + /// Query approximate in-memory byte usage for this shard. Used by the + /// `REDIS_MAXMEMORY_MB` sweeper. + MemoryStats { + response: oneshot::Sender, + }, + /// Evict random keys until approximate usage drops below `target_bytes`. + /// Returns the number of keys evicted. + EvictMemory { + target_bytes: usize, + response: oneshot::Sender, + }, /// Get snapshot of replicated keys for checkpointing GetSnapshot { response: oneshot::Sender< @@ -116,6 +127,35 @@ impl ReplicatedShardHandle { } /// Evict expired keys + /// Query approximate in-memory usage for this shard. + pub async fn memory_stats(&self) -> usize { + let (tx, rx) = oneshot::channel(); + if self + .tx + .send(ReplicatedShardMessage::MemoryStats { response: tx }) + .is_err() + { + return 0; + } + rx.await.unwrap_or(0) + } + + /// Evict random keys until this shard's usage drops below `target_bytes`. + pub async fn evict_memory(&self, target_bytes: usize) -> usize { + let (tx, rx) = oneshot::channel(); + if self + .tx + .send(ReplicatedShardMessage::EvictMemory { + target_bytes, + response: tx, + }) + .is_err() + { + return 0; + } + rx.await.unwrap_or(0) + } + pub async fn evict_expired(&self, current_time: VirtualTime) -> usize { let (tx, rx) = oneshot::channel(); if self @@ -246,6 +286,22 @@ impl ReplicatedShardActor { let _ = response.send(evicted); } + ReplicatedShardMessage::MemoryStats { response } => { + let used = self.executor.approx_used_bytes(); + let _ = response.send(used); + } + + ReplicatedShardMessage::EvictMemory { + target_bytes, + response, + } => { + let evicted = self.executor.evict_random_until_below(target_bytes); + let _ = response.send(evicted); + + #[cfg(debug_assertions)] + self.verify_invariants(); + } + ReplicatedShardMessage::GetSnapshot { response } => { let snapshot = self.replica_state.replicated_keys.clone(); let _ = response.send(snapshot); diff --git a/src/production/replicated_state.rs b/src/production/replicated_state.rs index af01346..590a276 100644 --- a/src/production/replicated_state.rs +++ b/src/production/replicated_state.rs @@ -428,6 +428,32 @@ impl ReplicatedShardedState { results.into_iter().sum() } + /// Approximate in-memory byte usage across all shards. See + /// `memory_ops::approx_used_bytes` for semantics. Used by the + /// memory-sweeper task spawned in `server_persistent`. + pub async fn approx_used_bytes_all_shards(&self) -> usize { + let futures: Vec<_> = self + .shards + .iter() + .map(|shard| shard.memory_stats()) + .collect(); + let results = futures::future::join_all(futures).await; + results.into_iter().fold(0usize, |a, b| a.saturating_add(b)) + } + + /// Evict random keys across all shards until process-wide usage drops + /// below `target_bytes`. Returns the total evicted. + pub async fn evict_memory_all_shards(&self, target_bytes: usize) -> usize { + let per_shard_target = target_bytes.saturating_div(self.shards.len().max(1)); + let futures: Vec<_> = self + .shards + .iter() + .map(|shard| shard.evict_memory(per_shard_target)) + .collect(); + let results = futures::future::join_all(futures).await; + results.into_iter().fold(0usize, |a, b| a.saturating_add(b)) + } + /// Get the time source pub fn time_source(&self) -> &T { &self.time_source diff --git a/src/production/sharded_actor.rs b/src/production/sharded_actor.rs index df7cde6..0d1db5e 100644 --- a/src/production/sharded_actor.rs +++ b/src/production/sharded_actor.rs @@ -85,6 +85,17 @@ pub enum ShardMessage { virtual_time: VirtualTime, response_tx: oneshot::Sender, }, + /// Query approximate in-memory byte usage for this shard. Used by the + /// `REDIS_MAXMEMORY_MB` sweeper to decide whether to trigger eviction. + MemoryStats { + response_tx: oneshot::Sender, + }, + /// Evict random keys until the shard's `approx_used_bytes()` drops + /// below `target_bytes`. Returns the number of keys evicted. + EvictMemory { + target_bytes: usize, + response_tx: oneshot::Sender, + }, /// Fast path for GET - avoids Command enum overhead FastGet { key: bytes::Bytes, @@ -205,6 +216,17 @@ impl ShardActor { let evicted = self.executor.evict_expired_direct(virtual_time); let _ = response_tx.send(evicted); } + ShardMessage::MemoryStats { response_tx } => { + let used = self.executor.approx_used_bytes(); + let _ = response_tx.send(used); + } + ShardMessage::EvictMemory { + target_bytes, + response_tx, + } => { + let evicted = self.executor.evict_random_until_below(target_bytes); + let _ = response_tx.send(evicted); + } ShardMessage::FastGet { key, response_tx } => { // Fast path: direct GET without Command enum overhead let key_str = unsafe { std::str::from_utf8_unchecked(&key) }; @@ -420,6 +442,34 @@ impl ShardHandle { response_rx.await.unwrap_or(0) } + + /// Query this shard's approximate byte usage. Returns 0 if the shard + /// is unreachable — callers should treat that the same as "don't evict" + /// so a routing blip doesn't wipe the dataset. + #[inline] + async fn memory_stats(&self) -> usize { + let (response_tx, response_rx) = oneshot::channel(); + let msg = ShardMessage::MemoryStats { response_tx }; + if self.tx.send(msg).is_err() { + return 0; + } + response_rx.await.unwrap_or(0) + } + + /// Evict random keys in this shard until usage drops below + /// `target_bytes`. Returns the count evicted. + #[inline] + async fn evict_memory(&self, target_bytes: usize) -> usize { + let (response_tx, response_rx) = oneshot::channel(); + let msg = ShardMessage::EvictMemory { + target_bytes, + response_tx, + }; + if self.tx.send(msg).is_err() { + return 0; + } + response_rx.await.unwrap_or(0) + } } /// Hash key string to shard index @@ -876,6 +926,36 @@ impl ShardedActorState { total } + /// Sum approximate byte usage across every shard. Used by the + /// `REDIS_MAXMEMORY_MB` sweeper (`server_persistent`) to decide when + /// to trigger eviction. + pub async fn approx_used_bytes_all_shards(&self) -> usize { + let mut total = 0usize; + for shard in self.shards.iter() { + total = total.saturating_add(shard.memory_stats().await); + } + total + } + + /// Evict random keys across all shards until process-wide approximate + /// usage drops below `target_bytes`. Returns the total keys evicted. + /// + /// The target is divided evenly across shards: each shard is told to + /// evict to `target_bytes / num_shards`. This is a simplification — + /// under uneven load distribution one shard may hold disproportionate + /// memory and the uniform split will over-evict it. For the + /// single-shard memory-mode tenants that motivated this feature + /// (`redis-cache-v2`, `redis-cache-edge`, etc.) `num_shards == 1` so + /// the split is a no-op. + pub async fn evict_memory_all_shards(&self, target_bytes: usize) -> usize { + let per_shard_target = target_bytes.saturating_div(self.shards.len().max(1)); + let mut total = 0usize; + for shard in self.shards.iter() { + total = total.saturating_add(shard.evict_memory(per_shard_target).await); + } + total + } + /// Fast path GET - bypasses Command enum for lower overhead /// /// Uses bytes::Bytes to avoid String allocation. The key is hashed diff --git a/src/redis/data/hash.rs b/src/redis/data/hash.rs index 9bc7a42..48db670 100644 --- a/src/redis/data/hash.rs +++ b/src/redis/data/hash.rs @@ -143,6 +143,21 @@ impl RedisHash { self.fields.is_empty() } + /// Approximate in-memory byte cost of this hash — sum of field-name and + /// value byte lengths plus per-entry hash-map overhead. Used by the + /// memory sweeper. + pub fn approx_bytes(&self) -> usize { + let per_entry_overhead = std::mem::size_of::().saturating_mul(2); + self.fields + .iter() + .map(|(k, v)| { + k.len() + .saturating_add(v.len()) + .saturating_add(per_entry_overhead) + }) + .fold(0usize, |acc, n| acc.saturating_add(n)) + } + pub fn keys(&self) -> Vec { self.fields.keys().map(|k| SDS::from_str(k)).collect() } diff --git a/src/redis/data/list.rs b/src/redis/data/list.rs index 1d36d35..4f9862e 100644 --- a/src/redis/data/list.rs +++ b/src/redis/data/list.rs @@ -176,6 +176,15 @@ impl RedisList { self.items.is_empty() } + /// Approximate in-memory byte cost of this list. See `SDS::approx_bytes` + /// for semantics — used by the memory sweeper. + pub fn approx_bytes(&self) -> usize { + self.items + .iter() + .map(|s| s.approx_bytes()) + .fold(0usize, |acc, n| acc.saturating_add(n)) + } + pub fn range(&self, start: isize, stop: isize) -> Vec { let len = self.items.len() as isize; let start = if start < 0 { diff --git a/src/redis/data/sds.rs b/src/redis/data/sds.rs index 36ed354..eaf5378 100644 --- a/src/redis/data/sds.rs +++ b/src/redis/data/sds.rs @@ -122,6 +122,22 @@ impl SDS { self.len() == 0 } + /// Approximate in-memory byte cost of this SDS, including the enum + /// discriminator and any heap allocation overhead. Used by the memory + /// sweeper to estimate key size for eviction decisions. Not a precise + /// `Layout`-level measurement — good enough to drive eviction away from + /// a memory-limit boundary. + #[inline] + pub fn approx_bytes(&self) -> usize { + match self { + // Inline keeps its bytes on the stack but we're asked about the + // logical string size, so return len(). + SDS::Inline { len, .. } => *len as usize, + // Heap: the data bytes + the Vec's own struct cost. + SDS::Heap(data) => data.len().saturating_add(std::mem::size_of::>()), + } + } + #[inline] pub fn as_bytes(&self) -> &[u8] { match self { diff --git a/src/redis/data/set.rs b/src/redis/data/set.rs index 87c5ec5..f89b220 100644 --- a/src/redis/data/set.rs +++ b/src/redis/data/set.rs @@ -135,6 +135,18 @@ impl RedisSet { self.members.is_empty() } + /// Approximate in-memory byte cost of this set — sum of member-string + /// byte lengths plus hash-set overhead per entry. Used by the memory + /// sweeper to drive eviction decisions. + pub fn approx_bytes(&self) -> usize { + // Per-entry overhead: string heap + HashSet bucket + let per_entry_overhead = std::mem::size_of::(); + self.members + .iter() + .map(|s| s.len().saturating_add(per_entry_overhead)) + .fold(0usize, |acc, n| acc.saturating_add(n)) + } + /// SPOP: Remove and return a random member from the set /// Returns None if the set is empty pub fn pop(&mut self) -> Option { diff --git a/src/redis/data/sorted_set.rs b/src/redis/data/sorted_set.rs index 3a124d6..7c9673d 100644 --- a/src/redis/data/sorted_set.rs +++ b/src/redis/data/sorted_set.rs @@ -209,6 +209,20 @@ impl RedisSortedSet { self.skiplist.len() } + /// Approximate in-memory byte cost of this sorted set — member strings, + /// per-entry hash-map and skip-list overhead, plus scores (f64). Used by + /// the memory sweeper. + pub fn approx_bytes(&self) -> usize { + // Each member lives in both the HashMap (key + f64) and the skiplist + // (node with back-pointers). Rough per-entry overhead ~ 64 bytes on + // 64-bit. Sum member string lengths + overhead. + let per_entry_overhead = 64usize; + self.members + .keys() + .map(|m| m.len().saturating_add(per_entry_overhead)) + .fold(0usize, |acc, n| acc.saturating_add(n)) + } + pub fn is_empty(&self) -> bool { self.members.is_empty() } diff --git a/src/redis/data/value.rs b/src/redis/data/value.rs index fd5a0f2..b6ce34b 100644 --- a/src/redis/data/value.rs +++ b/src/redis/data/value.rs @@ -47,4 +47,24 @@ impl Value { _ => None, } } + + /// Approximate in-memory byte cost of this value. The enum discriminator + /// plus type-specific sums (see each data-type's `approx_bytes`). Used + /// by `CommandExecutor::approx_used_bytes` to drive the optional + /// `REDIS_MAXMEMORY_MB` memory sweeper. + /// + /// Not a precise `Layout` measurement — an estimate good enough to + /// decide "are we getting close to the limit and need to evict?". + pub fn approx_bytes(&self) -> usize { + let variant = std::mem::size_of::(); + let payload = match self { + Value::String(s) => s.approx_bytes(), + Value::List(l) => l.approx_bytes(), + Value::Set(s) => s.approx_bytes(), + Value::Hash(h) => h.approx_bytes(), + Value::SortedSet(zs) => zs.approx_bytes(), + Value::Null => 0, + }; + variant.saturating_add(payload) + } } diff --git a/src/redis/executor/memory_ops.rs b/src/redis/executor/memory_ops.rs new file mode 100644 index 0000000..e323ff1 --- /dev/null +++ b/src/redis/executor/memory_ops.rs @@ -0,0 +1,278 @@ +//! Memory accounting and opt-in eviction for `CommandExecutor`. +//! +//! Redis exposes `maxmemory` + `maxmemory-policy` as CONFIG params, and +//! until this module shipped the values were stored but never enforced — +//! memory-mode single-pod tenants on `libstream-dev` / `libstream-edge` +//! OOMed at the container limit because writes accumulated unbounded (see +//! `dd/issue-memory-tenant-eviction.md` in the consumer repo). +//! +//! This module adds two read-only helpers on `CommandExecutor` plus a +//! random-eviction routine. The public surface is intentionally small and +//! additive: the memory sweeper runs on a periodic task out of +//! `server_persistent.rs`, so no existing write path is modified. +//! +//! Coverage, not precision: `approx_used_bytes` sums best-effort estimates +//! from each data type's `approx_bytes` helper. It's tuned to drive +//! eviction decisions away from a memory-limit boundary, not to back a +//! client-facing `INFO memory` stat. A more precise implementation +//! would track bytes incrementally on every mutation — orders of magnitude +//! more code change, deferred to a follow-up. + +use super::CommandExecutor; + +impl CommandExecutor { + /// Sum approximate byte costs across every key currently in the data + /// map. See `Value::approx_bytes` for the per-key estimate. + /// + /// Complexity: O(N) over the dataset. The sweeper calls this at the + /// configured interval (default 5 s) which is fine for hot tenants + /// with O(100K) keys; for larger tenants the interval should be + /// tuned up so the sweep cost stays under a few percent of CPU. + pub fn approx_used_bytes(&self) -> usize { + let map_overhead_per_entry = std::mem::size_of::() + .saturating_add(std::mem::size_of::()); + + self.data + .iter() + .map(|(k, v)| { + k.len() + .saturating_add(v.approx_bytes()) + .saturating_add(map_overhead_per_entry) + }) + .fold(0usize, |acc, n| acc.saturating_add(n)) + } + + /// Evict random keys until `approx_used_bytes()` drops below + /// `target_bytes`, and return the count evicted. Caller specifies the + /// target; the sweeper typically picks `maxmemory * 0.9` so we evict + /// down to a comfortable headroom instead of oscillating right at the + /// boundary. + /// + /// "Random" here means insertion-order-dependent (whatever `.keys()` + /// yields first) — not cryptographically random, but good enough to + /// avoid biasing toward any particular key shape. Expired keys are + /// evicted first so we don't waste live keys when dead ones still + /// occupy memory. + /// + /// ## Postcondition + /// Either `approx_used_bytes() <= target_bytes`, or the dataset is + /// empty (the caller should log if the latter — it means the target + /// can't be reached even with an empty store, which points at a leak + /// elsewhere). + pub fn evict_random_until_below(&mut self, target_bytes: usize) -> usize { + debug_assert!( + target_bytes < usize::MAX, + "Precondition: target must be a real number" + ); + + // First pass: sweep out expired keys for free. If that alone is + // enough we skip random eviction. + self.evict_expired_keys(); + if self.approx_used_bytes() <= target_bytes { + return 0; + } + + let mut evicted: usize = 0; + + // Collect candidates up front because we can't mutate `self.data` + // while iterating it. We evict in chunks of ~128 keys per + // measurement so we don't re-scan the entire map on every single + // eviction — that would be O(N^2) in the worst case. + const EVICT_CHUNK: usize = 128; + + loop { + let used = self.approx_used_bytes(); + if used <= target_bytes { + break; + } + if self.data.is_empty() { + // Can't reach target; caller should log. + break; + } + + let victims: Vec = self + .data + .keys() + .take(EVICT_CHUNK) + .cloned() + .collect(); + if victims.is_empty() { + break; + } + + for key in &victims { + self.data.remove(key); + self.expirations.remove(key); + evicted = evicted.saturating_add(1); + } + } + + debug_assert!( + self.approx_used_bytes() <= target_bytes || self.data.is_empty(), + "Postcondition: either below target or dataset empty" + ); + + evicted + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::redis::command::Command; + use crate::redis::resp::RespValue; + + /// Baseline: an empty executor reports a small but non-zero byte count + /// (struct overhead) and zero data contribution. + #[test] + fn approx_used_bytes_empty_is_zero_data() { + let exec = CommandExecutor::new(); + assert_eq!( + exec.approx_used_bytes(), + 0, + "Empty executor has no data contribution" + ); + } + + /// After writing N string keys, `approx_used_bytes` should scale with + /// the stored value sizes. We don't assert a precise figure (the helper + /// is explicitly an estimate) — just that it grows monotonically and + /// stays on the right order of magnitude. + #[test] + fn approx_used_bytes_grows_monotonically_with_writes() { + let mut exec = CommandExecutor::new(); + let baseline = exec.approx_used_bytes(); + + // Write 100 keys with ~1 KB values each → ~100 KB of payload + for i in 0..100 { + let key = format!("k{i:05}"); + let value = crate::redis::data::SDS::new("x".repeat(1024).into_bytes()); + let resp = exec.execute(&Command::Set { + key: key.clone(), + value, + ex: None, + px: None, + exat: None, + pxat: None, + nx: false, + xx: false, + keepttl: false, + get: false, + }); + assert!(matches!(resp, RespValue::SimpleString(_)), "SET should succeed"); + } + + let after = exec.approx_used_bytes(); + assert!( + after > baseline, + "Usage must grow after writes (baseline={baseline}, after={after})" + ); + // Lower bound: at least the value bytes (100 × 1024 = 102_400). + assert!( + after >= 100 * 1024, + "Usage must be at least the value payload; got {after}" + ); + } + + /// Evicting with an already-satisfied target does nothing and returns 0. + #[test] + fn evict_noop_when_below_target() { + let mut exec = CommandExecutor::new(); + // Populate a bit + for i in 0..10 { + exec.execute(&Command::Set { + key: format!("k{i}"), + value: crate::redis::data::SDS::new(b"v".to_vec()), + ex: None, + px: None, + exat: None, + pxat: None, + nx: false, + xx: false, + keepttl: false, + get: false, + }); + } + // Target far above usage + let evicted = exec.evict_random_until_below(usize::MAX); + assert_eq!(evicted, 0, "Below-target target must evict nothing"); + assert_eq!(exec.data.len(), 10, "Data must be untouched"); + } + + /// Driving the target to 0 must evict everything. + #[test] + fn evict_to_zero_drains_dataset() { + let mut exec = CommandExecutor::new(); + for i in 0..50 { + exec.execute(&Command::Set { + key: format!("k{i}"), + value: crate::redis::data::SDS::new("x".repeat(256).into_bytes()), + ex: None, + px: None, + exat: None, + pxat: None, + nx: false, + xx: false, + keepttl: false, + get: false, + }); + } + assert_eq!(exec.data.len(), 50); + + let evicted = exec.evict_random_until_below(0); + assert_eq!(evicted, 50, "All 50 keys must be evicted"); + assert!(exec.data.is_empty(), "Dataset must be empty"); + assert_eq!(exec.approx_used_bytes(), 0); + } + + /// Realistic: 200 KB target, ~400 KB of data, expect eviction down to + /// roughly the target. The estimate is loose, so we accept up to a + /// small overshoot (a chunk's worth of eviction budget). + #[test] + fn evict_to_partial_target_stops_near_threshold() { + let mut exec = CommandExecutor::new(); + // 400 × 1 KB = ~400 KB payload + for i in 0..400 { + exec.execute(&Command::Set { + key: format!("key-{i:04}"), + value: crate::redis::data::SDS::new("x".repeat(1024).into_bytes()), + ex: None, + px: None, + exat: None, + pxat: None, + nx: false, + xx: false, + keepttl: false, + get: false, + }); + } + let before = exec.approx_used_bytes(); + assert!(before > 200 * 1024, "Expected >200 KB of data, got {before}"); + + let evicted = exec.evict_random_until_below(200 * 1024); + assert!(evicted > 0, "Must have evicted something"); + + let after = exec.approx_used_bytes(); + // Allow a small overshoot because eviction runs in chunks of 128. + // After one chunk drops below target, the loop exits. + let max_acceptable = 200 * 1024 + exec.approx_used_bytes_per_chunk_slack(); + assert!( + after <= max_acceptable, + "Post-eviction usage {after} must be within chunk slack of 200 KB target" + ); + } + + impl CommandExecutor { + /// Test-only: approximate byte cost of one eviction chunk + /// (128 × average-key-size). Used as the allowed overshoot in the + /// "evict near threshold" test. + fn approx_used_bytes_per_chunk_slack(&self) -> usize { + let avg = if self.data.is_empty() { + 1024 // fallback + } else { + self.approx_used_bytes().saturating_div(self.data.len().max(1)) + }; + avg.saturating_mul(128) + } + } +} diff --git a/src/redis/executor/mod.rs b/src/redis/executor/mod.rs index 1db292a..76f5fdb 100644 --- a/src/redis/executor/mod.rs +++ b/src/redis/executor/mod.rs @@ -21,6 +21,7 @@ mod config_ops; mod hash_ops; mod key_ops; mod list_ops; +mod memory_ops; mod scan_ops; mod script_ops; mod set_ops;