Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/bin/server_persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,28 @@ impl ClusterConfig {
/// - Applies received deltas via CRDT merge (idempotent)
///
/// Message framing: [4 bytes big-endian length][JSON payload]
/// Parse the `REDIS_MAXMEMORY_MB` env var into a byte cap. Returns `None`
/// when the var is unset or parses to zero (the "disabled" sentinel,
/// matching Redis's own `maxmemory 0` default meaning "unlimited").
///
/// Surfaces a WARN on malformed values so operators learn fast if they
/// typo the env var, rather than getting silent "feature not active."
fn parse_maxmemory_env() -> Option<usize> {
let raw = std::env::var("REDIS_MAXMEMORY_MB").ok()?;
match raw.trim().parse::<usize>() {
Ok(0) => None,
Ok(mb) => mb.checked_mul(1024).and_then(|kb| kb.checked_mul(1024)),
Err(e) => {
warn!(
raw = %raw,
error = %e,
"Ignoring malformed REDIS_MAXMEMORY_MB; memory sweeper disabled"
);
None
}
}
}

async fn start_gossip_listener(
port: u16,
state: Arc<ReplicatedShardedState>,
Expand Down Expand Up @@ -782,6 +804,54 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Gossip replication started");
}

// ── Memory sweeper (optional) ────────────────────────────────────────
//
// When `REDIS_MAXMEMORY_MB` is set, spawn a background task that
// periodically checks process-wide approximate usage and evicts random
// keys when over the cap. Purpose: prevent memory-mode single-pod
// tenants from OOMing at the container limit (see
// `dd/issue-memory-tenant-eviction.md`).
//
// Default (env var unset or 0): no sweeper, no behavior change.
if let Some(maxmem_bytes) = parse_maxmemory_env() {
let interval_secs = std::env::var("REDIS_MAXMEMORY_CHECK_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(5);
let target_bytes = maxmem_bytes.saturating_mul(9).saturating_div(10);
info!(
"Memory sweeper enabled: max={} MB, target={} MB (90%), interval={}s",
maxmem_bytes / (1024 * 1024),
target_bytes / (1024 * 1024),
interval_secs
);

let sweeper_state = state.clone();
tokio::spawn(async move {
let mut ticker =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
// Skip the immediate first tick so startup isn't slowed.
ticker.tick().await;
loop {
ticker.tick().await;
let used = sweeper_state.approx_used_bytes_all_shards().await;
if used > maxmem_bytes {
let evicted =
sweeper_state.evict_memory_all_shards(target_bytes).await;
let post = sweeper_state.approx_used_bytes_all_shards().await;
warn!(
used_bytes = used,
max_bytes = maxmem_bytes,
target_bytes,
evicted,
post_eviction_bytes = post,
"Memory sweeper evicted keys to stay under maxmemory cap"
);
}
}
});
}

println!("Starting server...");
println!();

Expand Down
56 changes: 56 additions & 0 deletions src/production/replicated_shard_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ pub enum ReplicatedShardMessage {
current_time: VirtualTime,
response: oneshot::Sender<usize>,
},
/// Query approximate in-memory byte usage for this shard. Used by the
/// `REDIS_MAXMEMORY_MB` sweeper.
MemoryStats {
response: oneshot::Sender<usize>,
},
/// Evict random keys until approximate usage drops below `target_bytes`.
/// Returns the number of keys evicted.
EvictMemory {
target_bytes: usize,
response: oneshot::Sender<usize>,
},
/// Get snapshot of replicated keys for checkpointing
GetSnapshot {
response: oneshot::Sender<
Expand Down Expand Up @@ -116,6 +127,35 @@ impl ReplicatedShardHandle {
}

/// Evict expired keys
/// Query approximate in-memory usage for this shard.
pub async fn memory_stats(&self) -> usize {
let (tx, rx) = oneshot::channel();
if self
.tx
.send(ReplicatedShardMessage::MemoryStats { response: tx })
.is_err()
{
return 0;
}
rx.await.unwrap_or(0)
}

/// Evict random keys until this shard's usage drops below `target_bytes`.
pub async fn evict_memory(&self, target_bytes: usize) -> usize {
let (tx, rx) = oneshot::channel();
if self
.tx
.send(ReplicatedShardMessage::EvictMemory {
target_bytes,
response: tx,
})
.is_err()
{
return 0;
}
rx.await.unwrap_or(0)
}

pub async fn evict_expired(&self, current_time: VirtualTime) -> usize {
let (tx, rx) = oneshot::channel();
if self
Expand Down Expand Up @@ -246,6 +286,22 @@ impl ReplicatedShardActor {
let _ = response.send(evicted);
}

ReplicatedShardMessage::MemoryStats { response } => {
let used = self.executor.approx_used_bytes();
let _ = response.send(used);
}

ReplicatedShardMessage::EvictMemory {
target_bytes,
response,
} => {
let evicted = self.executor.evict_random_until_below(target_bytes);
let _ = response.send(evicted);

#[cfg(debug_assertions)]
self.verify_invariants();
}

ReplicatedShardMessage::GetSnapshot { response } => {
let snapshot = self.replica_state.replicated_keys.clone();
let _ = response.send(snapshot);
Expand Down
26 changes: 26 additions & 0 deletions src/production/replicated_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,32 @@ impl<T: TimeSource> ReplicatedShardedState<T> {
results.into_iter().sum()
}

/// Approximate in-memory byte usage across all shards. See
/// `memory_ops::approx_used_bytes` for semantics. Used by the
/// memory-sweeper task spawned in `server_persistent`.
pub async fn approx_used_bytes_all_shards(&self) -> usize {
let futures: Vec<_> = self
.shards
.iter()
.map(|shard| shard.memory_stats())
.collect();
let results = futures::future::join_all(futures).await;
results.into_iter().fold(0usize, |a, b| a.saturating_add(b))
}

/// Evict random keys across all shards until process-wide usage drops
/// below `target_bytes`. Returns the total evicted.
pub async fn evict_memory_all_shards(&self, target_bytes: usize) -> usize {
let per_shard_target = target_bytes.saturating_div(self.shards.len().max(1));
let futures: Vec<_> = self
.shards
.iter()
.map(|shard| shard.evict_memory(per_shard_target))
.collect();
let results = futures::future::join_all(futures).await;
results.into_iter().fold(0usize, |a, b| a.saturating_add(b))
}

/// Get the time source
pub fn time_source(&self) -> &T {
&self.time_source
Expand Down
80 changes: 80 additions & 0 deletions src/production/sharded_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ pub enum ShardMessage {
virtual_time: VirtualTime,
response_tx: oneshot::Sender<usize>,
},
/// Query approximate in-memory byte usage for this shard. Used by the
/// `REDIS_MAXMEMORY_MB` sweeper to decide whether to trigger eviction.
MemoryStats {
response_tx: oneshot::Sender<usize>,
},
/// Evict random keys until the shard's `approx_used_bytes()` drops
/// below `target_bytes`. Returns the number of keys evicted.
EvictMemory {
target_bytes: usize,
response_tx: oneshot::Sender<usize>,
},
/// Fast path for GET - avoids Command enum overhead
FastGet {
key: bytes::Bytes,
Expand Down Expand Up @@ -205,6 +216,17 @@ impl ShardActor {
let evicted = self.executor.evict_expired_direct(virtual_time);
let _ = response_tx.send(evicted);
}
ShardMessage::MemoryStats { response_tx } => {
let used = self.executor.approx_used_bytes();
let _ = response_tx.send(used);
}
ShardMessage::EvictMemory {
target_bytes,
response_tx,
} => {
let evicted = self.executor.evict_random_until_below(target_bytes);
let _ = response_tx.send(evicted);
}
ShardMessage::FastGet { key, response_tx } => {
// Fast path: direct GET without Command enum overhead
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
Expand Down Expand Up @@ -420,6 +442,34 @@ impl ShardHandle {

response_rx.await.unwrap_or(0)
}

/// Query this shard's approximate byte usage. Returns 0 if the shard
/// is unreachable — callers should treat that the same as "don't evict"
/// so a routing blip doesn't wipe the dataset.
#[inline]
async fn memory_stats(&self) -> usize {
let (response_tx, response_rx) = oneshot::channel();
let msg = ShardMessage::MemoryStats { response_tx };
if self.tx.send(msg).is_err() {
return 0;
}
response_rx.await.unwrap_or(0)
}

/// Evict random keys in this shard until usage drops below
/// `target_bytes`. Returns the count evicted.
#[inline]
async fn evict_memory(&self, target_bytes: usize) -> usize {
let (response_tx, response_rx) = oneshot::channel();
let msg = ShardMessage::EvictMemory {
target_bytes,
response_tx,
};
if self.tx.send(msg).is_err() {
return 0;
}
response_rx.await.unwrap_or(0)
}
}

/// Hash key string to shard index
Expand Down Expand Up @@ -876,6 +926,36 @@ impl<T: TimeSource> ShardedActorState<T> {
total
}

/// Sum approximate byte usage across every shard. Used by the
/// `REDIS_MAXMEMORY_MB` sweeper (`server_persistent`) to decide when
/// to trigger eviction.
pub async fn approx_used_bytes_all_shards(&self) -> usize {
let mut total = 0usize;
for shard in self.shards.iter() {
total = total.saturating_add(shard.memory_stats().await);
}
total
}

/// Evict random keys across all shards until process-wide approximate
/// usage drops below `target_bytes`. Returns the total keys evicted.
///
/// The target is divided evenly across shards: each shard is told to
/// evict to `target_bytes / num_shards`. This is a simplification —
/// under uneven load distribution one shard may hold disproportionate
/// memory and the uniform split will over-evict it. For the
/// single-shard memory-mode tenants that motivated this feature
/// (`redis-cache-v2`, `redis-cache-edge`, etc.) `num_shards == 1` so
/// the split is a no-op.
pub async fn evict_memory_all_shards(&self, target_bytes: usize) -> usize {
let per_shard_target = target_bytes.saturating_div(self.shards.len().max(1));
let mut total = 0usize;
for shard in self.shards.iter() {
total = total.saturating_add(shard.evict_memory(per_shard_target).await);
}
total
}

/// Fast path GET - bypasses Command enum for lower overhead
///
/// Uses bytes::Bytes to avoid String allocation. The key is hashed
Expand Down
15 changes: 15 additions & 0 deletions src/redis/data/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ impl RedisHash {
self.fields.is_empty()
}

/// Approximate in-memory byte cost of this hash — sum of field-name and
/// value byte lengths plus per-entry hash-map overhead. Used by the
/// memory sweeper.
pub fn approx_bytes(&self) -> usize {
let per_entry_overhead = std::mem::size_of::<String>().saturating_mul(2);
self.fields
.iter()
.map(|(k, v)| {
k.len()
.saturating_add(v.len())
.saturating_add(per_entry_overhead)
})
.fold(0usize, |acc, n| acc.saturating_add(n))
}

pub fn keys(&self) -> Vec<SDS> {
self.fields.keys().map(|k| SDS::from_str(k)).collect()
}
Expand Down
9 changes: 9 additions & 0 deletions src/redis/data/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ impl RedisList {
self.items.is_empty()
}

/// Approximate in-memory byte cost of this list. See `SDS::approx_bytes`
/// for semantics — used by the memory sweeper.
pub fn approx_bytes(&self) -> usize {
self.items
.iter()
.map(|s| s.approx_bytes())
.fold(0usize, |acc, n| acc.saturating_add(n))
}

pub fn range(&self, start: isize, stop: isize) -> Vec<SDS> {
let len = self.items.len() as isize;
let start = if start < 0 {
Expand Down
16 changes: 16 additions & 0 deletions src/redis/data/sds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ impl SDS {
self.len() == 0
}

/// Approximate in-memory byte cost of this SDS, including the enum
/// discriminator and any heap allocation overhead. Used by the memory
/// sweeper to estimate key size for eviction decisions. Not a precise
/// `Layout`-level measurement — good enough to drive eviction away from
/// a memory-limit boundary.
#[inline]
pub fn approx_bytes(&self) -> usize {
match self {
// Inline keeps its bytes on the stack but we're asked about the
// logical string size, so return len().
SDS::Inline { len, .. } => *len as usize,
// Heap: the data bytes + the Vec's own struct cost.
SDS::Heap(data) => data.len().saturating_add(std::mem::size_of::<Vec<u8>>()),
}
}

#[inline]
pub fn as_bytes(&self) -> &[u8] {
match self {
Expand Down
12 changes: 12 additions & 0 deletions src/redis/data/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ impl RedisSet {
self.members.is_empty()
}

/// Approximate in-memory byte cost of this set — sum of member-string
/// byte lengths plus hash-set overhead per entry. Used by the memory
/// sweeper to drive eviction decisions.
pub fn approx_bytes(&self) -> usize {
// Per-entry overhead: string heap + HashSet bucket
let per_entry_overhead = std::mem::size_of::<String>();
self.members
.iter()
.map(|s| s.len().saturating_add(per_entry_overhead))
.fold(0usize, |acc, n| acc.saturating_add(n))
}

/// SPOP: Remove and return a random member from the set
/// Returns None if the set is empty
pub fn pop(&mut self) -> Option<SDS> {
Expand Down
Loading
Loading