diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 32454ceb..d361b969 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -15,10 +15,12 @@ use std::{ net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, sync::Arc, + time::SystemTime, }; use tokio_util::sync::CancellationToken; use clap::Parser; +use ethlambda_blockchain::MILLISECONDS_PER_SLOT; use ethlambda_blockchain::key_manager::ValidatorKeyPair; use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2PToBlockChainRef}; use ethlambda_p2p::{Bootnode, P2P, PeerId, SwarmConfig, build_swarm, parse_enrs}; @@ -36,7 +38,9 @@ use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; use ethlambda_rpc::RpcConfig; -use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; +use ethlambda_storage::{ + MAX_RESUMABLE_DB_STATE_AGE, StorageBackend, Store, backend::RocksDBBackend, +}; const ASCII_ART: &str = r#" _ _ _ _ _ @@ -635,6 +639,30 @@ async fn fetch_initial_state( }; // Checkpoint sync path + + // Prefer resuming from a fresh on-disk state to avoid re-downloading what we already have. + if let Some(store) = Store::from_db_state(backend.clone(), genesis.genesis_time) { + let now_ms = SystemTime::UNIX_EPOCH + .elapsed() + .expect("already past the unix epoch") + .as_millis() as u64; + let current_slot = + now_ms.saturating_sub(genesis.genesis_time * 1000) / MILLISECONDS_PER_SLOT; + let finalized_slot = store.latest_finalized().slot; + let gap = current_slot.saturating_sub(finalized_slot); + if gap <= MAX_RESUMABLE_DB_STATE_AGE { + info!( + finalized_slot, + current_slot, gap, "Resuming from existing DB state" + ); + return Ok(store); + } + warn!( + finalized_slot, + current_slot, gap, "Existing DB state is stale; falling through to checkpoint sync" + ); + } + info!(%checkpoint_url, "Starting checkpoint sync"); // The state and block are fetched in parallel; if the peer advances diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 9662a36c..bb27e949 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,4 +3,4 @@ pub mod backend; mod store; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, GetForkchoiceStoreError, Store}; +pub use store::{ForkCheckpoints, GetForkchoiceStoreError, MAX_RESUMABLE_DB_STATE_AGE, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 1ba86503..d765d8f6 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -19,7 +19,7 @@ use ethlambda_types::{ }; use libssz::{SszDecode, SszEncode}; use thiserror::Error; -use tracing::info; +use tracing::{info, warn}; /// Errors returned by [`Store::get_forkchoice_store`]. #[derive(Debug, Error)] @@ -105,6 +105,9 @@ const BLOCKS_TO_KEEP: usize = 21_600; /// ~3.3 hours of state history at 4-second slots (12000 / 4 = 3000). const STATES_TO_KEEP: usize = 3_000; +/// ~30 minutes of resume window at 4-second slots (1800 / 4 = 450). +pub const MAX_RESUMABLE_DB_STATE_AGE: u64 = 450; + const _: () = assert!( BLOCKS_TO_KEEP >= STATES_TO_KEEP, "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" @@ -550,6 +553,41 @@ impl Store { )) } + /// Build a Store from the state already persisted in the storage backend. + /// + /// Returns `None` if the backend is empty or its persisted `genesis_time` + /// doesn't match `expected_genesis_time`. + pub fn from_db_state( + backend: Arc, + expected_genesis_time: u64, + ) -> Option { + let persisted_config = { + let view = backend.begin_read().expect("read view"); + let bytes = view.get(Table::Metadata, KEY_CONFIG).expect("get config")?; + // probe KEY_LATEST_FINALIZED + view.get(Table::Metadata, KEY_LATEST_FINALIZED) + .expect("get latest finalized")?; + ChainConfig::from_ssz_bytes(&bytes).expect("valid config") + }; + if persisted_config.genesis_time != expected_genesis_time { + warn!( + db_genesis_time = persisted_config.genesis_time, + expected_genesis_time, + "Persisted DB has a different genesis_time; treating as empty" + ); + return None; + } + info!("Loaded store from persisted DB state"); + Some(Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( + GOSSIP_SIGNATURE_CAP, + ))), + }) + } + /// Internal helper to initialize the store with anchor data. /// /// Header is taken from `anchor_state.latest_block_header`. @@ -2548,4 +2586,46 @@ mod tests { let store = Store::from_anchor_state(backend, State::from_genesis(0, vec![])); assert!(store.get_signed_block(&root).is_none()); } + + // ============ from_db_state Tests ============ + + #[test] + fn from_db_state_returns_none_on_empty_backend() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + assert!(Store::from_db_state(backend, 12345).is_none()); + } + + #[test] + fn from_db_state_returns_some_on_matching_genesis_time() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + // Write an initial state to the backend. + let _ = Store::from_anchor_state(backend.clone(), State::from_genesis(12345, vec![])); + assert!(Store::from_db_state(backend, 12345).is_some()); + } + + #[test] + fn from_db_state_returns_none_on_genesis_time_mismatch() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + // Write an initial state to the backend. + let _ = Store::from_anchor_state(backend.clone(), State::from_genesis(12345, vec![])); + assert!(Store::from_db_state(backend, 99999).is_none()); + } + + #[test] + fn from_db_state_returns_none_when_latest_finalized_is_missing() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + // Write only KEY_CONFIG, leaving KEY_LATEST_FINALIZED absent. + let config = ChainConfig { + genesis_time: 12345, + }; + let mut batch = backend.begin_write().expect("write batch"); + batch + .put_batch( + Table::Metadata, + vec![(KEY_CONFIG.to_vec(), config.to_ssz())], + ) + .expect("put config"); + batch.commit().expect("commit"); + assert!(Store::from_db_state(backend, 12345).is_none()); + } }