diff --git a/Cargo.lock b/Cargo.lock index 547c2bff..f3bfb5a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2010,6 +2010,7 @@ version = "0.1.0" dependencies = [ "clap", "ethlambda-blockchain", + "ethlambda-crypto", "ethlambda-network-api", "ethlambda-p2p", "ethlambda-rpc", @@ -2067,6 +2068,7 @@ dependencies = [ "leansig", "leansig_wrapper", "rand 0.10.1", + "rayon", "thiserror 2.0.18", ] diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index e5e22ee9..e39605ef 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -33,5 +33,14 @@ eyre.workspace = true tikv-jemallocator.workspace = true +# Only used by the `zk-alloc` feature, to install the proving-scoped global +# allocator and run leanVM's startup core-count assertion. +ethlambda-crypto = { workspace = true, optional = true } + +[features] +# Benchmark-only: swap jemalloc for leanVM's arena allocator, scoped to proving +# threads. Drops jemalloc + /debug/pprof heap profiling for this build. +zk-alloc = ["dep:ethlambda-crypto", "ethlambda-crypto/zk-alloc"] + [build-dependencies] vergen-git2.workspace = true diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index d361b969..58802d20 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -1,15 +1,22 @@ mod checkpoint_sync; mod version; -#[cfg(not(target_env = "msvc"))] +// Default allocator: jemalloc with heap profiling. Under the `zk-alloc` +// benchmark feature this is replaced by leanVM's proving-scoped arena allocator +// (`ethlambda_crypto::ScopedZkAlloc`), which is incompatible with jemalloc. +#[cfg(all(not(target_env = "msvc"), not(feature = "zk-alloc")))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; -#[cfg(not(target_env = "msvc"))] +#[cfg(all(not(target_env = "msvc"), not(feature = "zk-alloc")))] #[allow(non_upper_case_globals)] #[unsafe(export_name = "malloc_conf")] static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; +#[cfg(feature = "zk-alloc")] +#[global_allocator] +static ALLOC: ethlambda_crypto::ScopedZkAlloc = ethlambda_crypto::ScopedZkAlloc; + use std::{ collections::{BTreeMap, HashMap}, net::{IpAddr, SocketAddr}, @@ -162,10 +169,21 @@ async fn main() -> eyre::Result<()> { })?; let p2p_socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), options.gossipsub_port); - #[cfg(not(target_env = "msvc"))] + #[cfg(all(not(target_env = "msvc"), not(feature = "zk-alloc")))] info!("Using jemalloc allocator with heap profiling enabled"); - #[cfg(target_env = "msvc")] + #[cfg(all(target_env = "msvc", not(feature = "zk-alloc")))] info!("Using system allocator (MSVC target)"); + #[cfg(feature = "zk-alloc")] + { + // Asserts available_parallelism() == the core count this binary was + // built for; panics on mismatch. The binary must be built on (or for) + // the machine it runs on. See ethlambda_crypto::zk_alloc. + ethlambda_crypto::init_allocator(); + // Build the global rayon pool with arena-flagged workers before any + // other rayon user (leanVM's setup_prover) creates it unflagged. + ethlambda_crypto::init_arena_rayon_pool(); + info!("Using zk-alloc arena allocator (proving-scoped, benchmark build)"); + } info!(node_key=?options.node_key, "got node key"); diff --git a/crates/common/crypto/Cargo.toml b/crates/common/crypto/Cargo.toml index e92ab3bb..22e6ca69 100644 --- a/crates/common/crypto/Cargo.toml +++ b/crates/common/crypto/Cargo.toml @@ -20,5 +20,14 @@ leansig.workspace = true thiserror.workspace = true rand.workspace = true +# Only needed by the `zk-alloc` feature, to mark the global rayon pool's prover +# threads around an arena phase. +rayon = { workspace = true, optional = true } + +[features] +# Benchmark-only: route leanVM's bump-arena allocator to proving threads via a +# scoped global allocator. See `src/zk_alloc.rs`. OFF by default. +zk-alloc = ["dep:rayon"] + [dev-dependencies] hex.workspace = true diff --git a/crates/common/crypto/src/lib.rs b/crates/common/crypto/src/lib.rs index 953fc7a8..9a4eb8e4 100644 --- a/crates/common/crypto/src/lib.rs +++ b/crates/common/crypto/src/lib.rs @@ -32,6 +32,72 @@ pub fn ensure_verifier_ready() { VERIFIER_INIT.call_once(setup_verifier); } +#[cfg(feature = "zk-alloc")] +mod zk_alloc; +#[cfg(feature = "zk-alloc")] +pub use zk_alloc::{ScopedZkAlloc, init_allocator, init_arena_rayon_pool}; + +// Exercise the real arena path in this crate's test binary: the aggregate/verify +// round-trip tests below then allocate through `ScopedZkAlloc`, so proving +// allocations actually hit leanVM's arena and outputs must survive serialization. +#[cfg(all(test, feature = "zk-alloc"))] +#[global_allocator] +static TEST_ALLOC: ScopedZkAlloc = ScopedZkAlloc; + +/// Run a Type-1 prover call, then serialize the proof to its on-wire bytes. +/// +/// Under the `zk-alloc` feature the prover call runs inside an arena phase +/// (serialized behind a global proving lock), and serialization happens *after* +/// the phase ends so the returned bytes land in the system allocator and survive +/// the next phase's slab reset. Without the feature this is just +/// `ensure_prover_ready` + `produce` + serialize. +#[cfg(feature = "zk-alloc")] +fn prove_type1(produce: F) -> Result +where + F: FnOnce() -> Result, +{ + // Must precede `ensure_prover_ready`: `setup_prover` is the first rayon user + // and would otherwise build an unflagged global pool. + zk_alloc::init_arena_rayon_pool(); + let session = zk_alloc::ArenaSession::begin(); + ensure_prover_ready(); + let proof = session.prove(produce); + compress_type1_to_byte_list(&proof?) +} + +#[cfg(not(feature = "zk-alloc"))] +fn prove_type1(produce: F) -> Result +where + F: FnOnce() -> Result, +{ + ensure_prover_ready(); + compress_type1_to_byte_list(&produce()?) +} + +/// Type-2 counterpart of [`prove_type1`]. +#[cfg(feature = "zk-alloc")] +fn prove_type2(produce: F) -> Result +where + F: FnOnce() -> Result, +{ + // Must precede `ensure_prover_ready`: `setup_prover` is the first rayon user + // and would otherwise build an unflagged global pool. + zk_alloc::init_arena_rayon_pool(); + let session = zk_alloc::ArenaSession::begin(); + ensure_prover_ready(); + let proof = session.prove(produce); + compress_type2_to_byte_list(&proof?) +} + +#[cfg(not(feature = "zk-alloc"))] +fn prove_type2(produce: F) -> Result +where + F: FnOnce() -> Result, +{ + ensure_prover_ready(); + compress_type2_to_byte_list(&produce()?) +} + /// Error type for signature aggregation operations. #[derive(Debug, Error)] pub enum AggregationError { @@ -164,18 +230,16 @@ pub fn aggregate_signatures( return Err(AggregationError::EmptyInput); } - ensure_prover_ready(); - - let raw_xmss: Vec<(LeanSigPubKey, LeanSigSignature)> = public_keys - .into_iter() - .zip(signatures) - .map(|(pk, sig)| (pk.into_inner(), sig.into_inner())) - .collect(); - - let proof = aggregate_type_1(&[], raw_xmss, message.0, slot, LOG_INV_RATE) - .map_err(|err| AggregationError::ProverFailure(err.to_string()))?; + prove_type1(move || { + let raw_xmss: Vec<(LeanSigPubKey, LeanSigSignature)> = public_keys + .into_iter() + .zip(signatures) + .map(|(pk, sig)| (pk.into_inner(), sig.into_inner())) + .collect(); - compress_type1_to_byte_list(&proof) + aggregate_type_1(&[], raw_xmss, message.0, slot, LOG_INV_RATE) + .map_err(|err| AggregationError::ProverFailure(err.to_string())) + }) } /// Aggregate both existing Type-1 proofs (children) and raw XMSS signatures. @@ -202,24 +266,22 @@ pub fn aggregate_mixed( return Err(AggregationError::InsufficientChildren(children.len())); } - ensure_prover_ready(); - - let children_native: Vec = children - .into_iter() - .enumerate() - .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) - .collect::>()?; - - let raw_xmss: Vec<(LeanSigPubKey, LeanSigSignature)> = raw_public_keys - .into_iter() - .zip(raw_signatures) - .map(|(pk, sig)| (pk.into_inner(), sig.into_inner())) - .collect(); - - let proof = aggregate_type_1(&children_native, raw_xmss, message.0, slot, LOG_INV_RATE) - .map_err(|err| AggregationError::ProverFailure(err.to_string()))?; - - compress_type1_to_byte_list(&proof) + prove_type1(move || { + let children_native: Vec = children + .into_iter() + .enumerate() + .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) + .collect::>()?; + + let raw_xmss: Vec<(LeanSigPubKey, LeanSigSignature)> = raw_public_keys + .into_iter() + .zip(raw_signatures) + .map(|(pk, sig)| (pk.into_inner(), sig.into_inner())) + .collect(); + + aggregate_type_1(&children_native, raw_xmss, message.0, slot, LOG_INV_RATE) + .map_err(|err| AggregationError::ProverFailure(err.to_string())) + }) } /// Recursively aggregate two or more already-aggregated Type-1 proofs into one. @@ -235,18 +297,16 @@ pub fn aggregate_proofs( return Err(AggregationError::InsufficientChildren(children.len())); } - ensure_prover_ready(); - - let children_native: Vec = children - .into_iter() - .enumerate() - .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) - .collect::>()?; + prove_type1(move || { + let children_native: Vec = children + .into_iter() + .enumerate() + .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) + .collect::>()?; - let proof = aggregate_type_1(&children_native, vec![], message.0, slot, LOG_INV_RATE) - .map_err(|err| AggregationError::ProverFailure(err.to_string()))?; - - compress_type1_to_byte_list(&proof) + aggregate_type_1(&children_native, vec![], message.0, slot, LOG_INV_RATE) + .map_err(|err| AggregationError::ProverFailure(err.to_string())) + }) } /// Verify a Type-1 aggregated signature proof. @@ -299,18 +359,16 @@ pub fn merge_type_1s_into_type_2( return Err(AggregationError::EmptyInput); } - ensure_prover_ready(); - - let type_1s_native: Vec = type_1s - .into_iter() - .enumerate() - .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) - .collect::>()?; - - let merged = merge_many_type_1(type_1s_native, LOG_INV_RATE) - .map_err(|err| AggregationError::ProverFailure(err.to_string()))?; + prove_type2(move || { + let type_1s_native: Vec = type_1s + .into_iter() + .enumerate() + .map(|(i, (pubkeys, proof_bytes))| decompress_type1(pubkeys, &proof_bytes, i)) + .collect::>()?; - compress_type2_to_byte_list(&merged) + merge_many_type_1(type_1s_native, LOG_INV_RATE) + .map_err(|err| AggregationError::ProverFailure(err.to_string())) + }) } /// Verify a Type-2 merged proof against the per-component expected bindings. @@ -380,32 +438,30 @@ pub fn split_type_2_by_message( pubkeys_per_component: Vec>, message: &H256, ) -> Result { - ensure_prover_ready(); - - let pubkeys_per_info: Vec> = pubkeys_per_component - .into_iter() - .map(into_lean_pubkeys) - .collect(); - - let type_2 = LMType2::decompress_without_pubkeys(proof_data, pubkeys_per_info) - .ok_or(AggregationError::DeserializationFailed)?; - - let matches: Vec = type_2 - .info - .iter() - .enumerate() - .filter_map(|(i, info)| (info.without_pubkeys.message == message.0).then_some(i)) - .collect(); - let index = match matches.as_slice() { - [i] => *i, - [] => return Err(AggregationError::UnknownMessage), - _ => return Err(AggregationError::MultipleMessages), - }; - - let component = split_type_2(type_2, index, LOG_INV_RATE) - .map_err(|err| AggregationError::ProverFailure(err.to_string()))?; - - compress_type1_to_byte_list(&component) + prove_type1(move || { + let pubkeys_per_info: Vec> = pubkeys_per_component + .into_iter() + .map(into_lean_pubkeys) + .collect(); + + let type_2 = LMType2::decompress_without_pubkeys(proof_data, pubkeys_per_info) + .ok_or(AggregationError::DeserializationFailed)?; + + let matches: Vec = type_2 + .info + .iter() + .enumerate() + .filter_map(|(i, info)| (info.without_pubkeys.message == message.0).then_some(i)) + .collect(); + let index = match matches.as_slice() { + [i] => *i, + [] => return Err(AggregationError::UnknownMessage), + _ => return Err(AggregationError::MultipleMessages), + }; + + split_type_2(type_2, index, LOG_INV_RATE) + .map_err(|err| AggregationError::ProverFailure(err.to_string())) + }) } #[cfg(test)] diff --git a/crates/common/crypto/src/zk_alloc.rs b/crates/common/crypto/src/zk_alloc.rs new file mode 100644 index 00000000..3ec3233a --- /dev/null +++ b/crates/common/crypto/src/zk_alloc.rs @@ -0,0 +1,188 @@ +//! Proving-scoped integration of leanVM's `zk-alloc` arena allocator. +//! +//! **Benchmark build only** (`zk-alloc` feature). The goal is to get leanVM's +//! bump-arena speedup for XMSS aggregation without destabilizing a long-running +//! node. +//! +//! # Why a dispatcher instead of installing `ZkAllocator` directly +//! +//! `ZkAllocator` is a *process-global* bump-arena: [`begin_phase`] flips a global +//! switch and resets every thread's slab. leanVM's own binary is safe because it +//! does nothing but prove between `begin_phase`/`end_phase`. ethlambda is not: +//! the tokio runtime, p2p, storage, and actor threads allocate continuously. Any +//! long-lived buffer one of them allocates during a phase would be silently +//! overwritten by the next phase's slab reset. +//! +//! [`ScopedZkAlloc`] is a `#[global_allocator]` that routes to the arena **only on +//! threads explicitly marked as proving**: the global rayon pool's workers, which +//! leanVM's prover uses exclusively (ethlambda itself never touches the global +//! pool). The pool is built at startup via [`init_arena_rayon_pool`] with a +//! `start_handler` that permanently flags each worker. Every other thread — +//! tokio, p2p, storage, the actor thread — always uses the system allocator, so +//! its allocations are never reset. The prover *caller* is also unflagged: only +//! the parallel work inside leanVM lands in the arena, and the assembled proof +//! itself lands in System. +//! +//! Two earlier designs failed and inform this one: +//! - Marking the global pool's workers with `rayon::broadcast` around each phase +//! deadlocked the node: broadcast left the pool's sleep/wakeup accounting in a +//! state where the prover's injected work was never stolen. +//! - A dedicated `install`-target pool crashed on the second proof: rayon's and +//! crossbeam's long-lived internals (epoch participants, work-stealing deque +//! buffers) were first allocated *inside* a phase, in arena memory the next +//! phase's slab reset corrupted (`crossbeam_epoch::try_advance` UAF, then +//! rayon `AbortIfPanic`). The global pool avoids this because `setup_prover` +//! runs heavy parallel work on it *before any phase exists*, growing those +//! internals to near-peak size in System memory — the same warmup leanVM's +//! own binaries rely on. +//! +//! Two conditions gate an arena allocation: the thread-local [`USE_ARENA`] flag +//! **and** leanVM's global `ARENA_ACTIVE` (checked inside `ZkAllocator::alloc`, +//! set by `begin_phase`). So even on a flagged thread, anything allocated outside +//! a phase — prover setup, input decompression, output serialization — lands in +//! the system allocator. +//! +//! # Accepted limitations +//! +//! - jemalloc and its `/debug/pprof` heap endpoints are gone in this build: +//! `ZkAllocator::dealloc` forwards non-arena frees to `std::alloc::System`, so +//! the non-proving path must be `System` (libc), not jemalloc, or we would free +//! a jemalloc pointer with libc `free`. +//! - All proving is serialized behind [`PROVING`]; concurrent aggregation (the +//! spawn_blocking worker vs. actor-thread block building) blocks rather than +//! running in parallel. Acceptable for a benchmark; it also prevents the +//! `begin_phase` nesting panic. + +use std::alloc::{GlobalAlloc, Layout, System}; +use std::cell::Cell; +use std::sync::{Mutex, MutexGuard, Once}; + +use lean_multisig::{ZkAllocator, begin_phase, end_phase}; + +/// Re-exported so the binary can run leanVM's startup core-count assertion. +pub use lean_multisig::init_allocator; + +thread_local! { + /// Marks the current thread as a proving thread. While set *and* a phase is + /// active, this thread's allocations route to leanVM's arena; otherwise to + /// the system allocator. + static USE_ARENA: Cell = const { Cell::new(false) }; +} + +/// Serializes every entry into the leanVM prover so phases never nest and +/// `ARENA_ACTIVE` is only ever true for one thread group at a time. +pub static PROVING: Mutex<()> = Mutex::new(()); + +/// Global allocator that confines leanVM's arena to proving threads. +pub struct ScopedZkAlloc; + +// SAFETY: every returned pointer comes from either `ZkAllocator` (which yields +// arena memory only while a phase is active, System otherwise) or `System` +// directly. `dealloc` always defers to `ZkAllocator::dealloc`, which is +// address-based — arena pointers are a no-op, all others forward to System — +// so it correctly frees pointers produced by either path, from any thread. +unsafe impl GlobalAlloc for ScopedZkAlloc { + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + if USE_ARENA.get() { + unsafe { ZkAllocator.alloc(layout) } + } else { + unsafe { System.alloc(layout) } + } + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + if USE_ARENA.get() { + unsafe { ZkAllocator.alloc_zeroed(layout) } + } else { + unsafe { System.alloc_zeroed(layout) } + } + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + unsafe { ZkAllocator.dealloc(ptr, layout) }; + } + + #[inline] + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + // Must NOT defer to `ZkAllocator::realloc`: on growth it calls its own + // `alloc`, which gates on the global `ARENA_ACTIVE` rather than our + // thread-local flag, which would leak arena memory onto non-proving + // threads. Route through our own thread-gated alloc/dealloc instead. + if new_size <= layout.size() { + // Shrink in place; the existing block is large enough. Matches + // `ZkAllocator`'s behaviour and is valid for arena and System + // pointers alike. + return ptr; + } + let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) }; + let new_ptr = unsafe { self.alloc(new_layout) }; + if !new_ptr.is_null() { + unsafe { std::ptr::copy_nonoverlapping(ptr, new_ptr, layout.size()) }; + unsafe { self.dealloc(ptr, layout) }; + } + new_ptr + } +} + +/// Build the **global** rayon pool with arena-flagged workers. Must run before +/// anything else initializes the global pool (leanVM's `setup_prover` is the +/// first rayon user in ethlambda). Idempotent. If the pool was already built by +/// someone else, its workers stay unflagged and proving simply falls back to the +/// system allocator — safe, just no arena speedup. +pub fn init_arena_rayon_pool() { + static INIT: Once = Once::new(); + INIT.call_once(|| { + rayon::ThreadPoolBuilder::new() + .thread_name(|i| format!("zk-prover-{i}")) + .start_handler(|_| USE_ARENA.set(true)) + .build_global() + .inspect_err(|err| { + eprintln!( + "zk-alloc: global rayon pool already initialized ({err}); \ + proving will not use the arena" + ); + }) + .ok(); + }); +} + +/// Holds the [`PROVING`] lock for one prover operation, serializing all proving so +/// phases never nest. The lock is released when the session is dropped, which must +/// be *after* the proof is serialized (the proof may reference arena memory that +/// the next phase would reset). +pub(crate) struct ArenaSession { + _lock: MutexGuard<'static, ()>, +} + +impl ArenaSession { + pub(crate) fn begin() -> Self { + let lock = PROVING.lock().unwrap_or_else(|poison| poison.into_inner()); + Self { _lock: lock } + } + + /// Run `produce` (the prover call) inside an arena phase. `produce` executes + /// on the calling thread (unflagged → System); the prover's internal rayon + /// work runs on the arena-flagged global pool workers. The returned value may + /// reference arena memory; it is safe to read until the next `begin_phase`, + /// which the held lock prevents. + pub(crate) fn prove(&self, produce: F) -> T + where + F: FnOnce() -> T, + { + begin_phase(); + // Guarantees `end_phase` runs even if the prover panics, so the global + // arena switch is never left stuck active. leanVM's `end_phase` also + // flushes the global pool's injector (its job blocks may live in arena). + struct EndOnDrop; + impl Drop for EndOnDrop { + fn drop(&mut self) { + end_phase(); + } + } + let _end = EndOnDrop; + produce() + } +}