diff --git a/CHANGELOG.sf.md b/CHANGELOG.sf.md index 3b8f041987..9a4eed3f11 100644 --- a/CHANGELOG.sf.md +++ b/CHANGELOG.sf.md @@ -1,3 +1,7 @@ +## v0.9.1-fh-1 + +* Fixed flash blocks to arrive in the right order and be 100% identical to the canonical blocks + ## v0.9.1-fh * bumped upstream to 0.9.1 diff --git a/crates/firehose-flashblocks/src/processor.rs b/crates/firehose-flashblocks/src/processor.rs index 747e0aa965..4ca10560a2 100644 --- a/crates/firehose-flashblocks/src/processor.rs +++ b/crates/firehose-flashblocks/src/processor.rs @@ -131,10 +131,7 @@ impl std::fmt::Debug for SpeculativeStateRoot { .field("block_number", &self.block_number) .field("flashblock_index", &self.flashblock_index) .field("revision", &self.revision) - .field( - "result", - &self.result.lock().ok().and_then(|guard| *guard), - ) + .field("result", &self.result.lock().ok().and_then(|guard| *guard)) .finish_non_exhaustive() } } @@ -426,11 +423,7 @@ where pub fn speculative_state_root_status_for_test(&self) -> Option<(u64, u64, u64, bool)> { let state = self.state.lock().expect("flashblock state mutex poisoned"); let spec = state.speculative_state_root.as_ref()?; - let completed = spec - .result - .lock() - .expect("spec result mutex poisoned") - .is_some(); + let completed = spec.result.lock().expect("spec result mutex poisoned").is_some(); Some((spec.block_number, spec.flashblock_index, spec.revision, completed)) } @@ -481,10 +474,7 @@ where /// /// - **None** — `(false, None)`: peek is absent or unrelated; the current flashblock /// executes and emits as a non-final partial. - fn classify_peek( - current: &Flashblock, - peek: Option<&Flashblock>, - ) -> (bool, Option) { + fn classify_peek(current: &Flashblock, peek: Option<&Flashblock>) -> (bool, Option) { let Some(peek) = peek else { return (false, None) }; let cur_block = current.metadata.block_number; let peek_block = peek.metadata.block_number; @@ -697,12 +687,30 @@ where // divergence between our `BlockAssembler` and reth's canonical // sealing, etc. — see the diagnostic fields surfaced below). if !state.final_part_sent && !state.stored_flashblocks.is_empty() { + // Same optimization as `on_canonical_block`: the live + // revision still matches what the speculative precompute + // captured (no flashblock has executed for the previous + // block since the spec was launched), so a hit avoids + // the synchronous trie traversal. + let cached_state_root = Self::cached_state_root_for( + &state, + latest_block, + state.bundle_revision, + ); + if cached_state_root.is_some() { + debug!( + block = latest_block, + revision = state.bundle_revision, + "FirstOfNextBlock-fallback: using speculatively-precomputed state_root" + ); + } match Self::build_is_final_emission( latest_block, latest_idx, &state.stored_flashblocks, state.accumulated_db.as_ref(), stored_parent_hash, + cached_state_root, ) { Ok(emission) => { pending_final_emission = Some(emission); @@ -849,7 +857,8 @@ where if squash { debug!( block = block_number, - index, "squashing flashblock execution; accumulated for next non-squashed flashblock" + index, + "squashing flashblock execution; accumulated for next non-squashed flashblock" ); // Still flush any pending is_final FIRE BLOCK queued by the // FirstOfNextBlock transition (squashing the new base must not drop @@ -1023,8 +1032,7 @@ where let Ok(rt) = tokio::runtime::Handle::try_current() else { debug!( block_number, - index, - "no tokio runtime in scope; skipping speculative state-root precompute" + index, "no tokio runtime in scope; skipping speculative state-root precompute" ); return; }; @@ -1037,19 +1045,18 @@ where let parent_block = block_number.saturating_sub(1); let handle = rt.spawn_blocking(move || { - let provider = match client - .state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_block)) - { - Ok(p) => p, - Err(err) => { - debug!( - parent_block, - error = %err, - "speculative state-root: parent state unavailable" - ); - return; - } - }; + let provider = + match client.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_block)) { + Ok(p) => p, + Err(err) => { + debug!( + parent_block, + error = %err, + "speculative state-root: parent state unavailable" + ); + return; + } + }; let hashed = provider.hashed_post_state(&bundle); match provider.state_root(hashed) { Ok(root) => { @@ -1075,12 +1082,7 @@ where result, handle, }); - debug!( - block_number, - index, - revision, - "speculative state-root precompute launched" - ); + debug!(block_number, index, revision, "speculative state-root precompute launched"); } /// Returns the speculatively-precomputed state_root if one exists and is still @@ -1093,6 +1095,19 @@ where expected_revision: u64, ) -> Option { let state = self.state.lock().ok()?; + Self::cached_state_root_for(&state, block_number, expected_revision) + } + + /// Same as [`Self::try_speculative_state_root`] but reads from a `ProcessorState` + /// guard the caller already holds — used by [`Self::on_canonical_block`] and the + /// `FirstOfNextBlock` fallback in [`Self::process_inner`], both of which lock + /// `self.state` for the duration of the is_final-emission decision and would + /// deadlock against re-locking inside [`Self::try_speculative_state_root`]. + fn cached_state_root_for( + state: &ProcessorState, + block_number: u64, + expected_revision: u64, + ) -> Option { let spec = state.speculative_state_root.as_ref()?; if spec.block_number != block_number || spec.revision != expected_revision { return None; @@ -1165,14 +1180,29 @@ where state.awaiting_canonical_confirmation = false; return; } - let stored_clone = state.stored_flashblocks.clone(); let latest_idx = state.latest_flashblock_index.unwrap_or(0); + // No bundle-changing work has happened since the last flashblock + // execution, so the live revision is the one the speculative + // precompute (if any) was launched against. A hit skips the slow + // synchronous state_root and is the key to beating reth's full-block + // FIRE BLOCK out the door. + let cached_state_root = + Self::cached_state_root_for(&state, canonical_block_number, state.bundle_revision); + if cached_state_root.is_some() { + debug!( + block = canonical_block_number, + revision = state.bundle_revision, + "canonical-driven is_final: using speculatively-precomputed state_root" + ); + } + match Self::build_is_final_emission( canonical_block_number, latest_idx, - &stored_clone, + &state.stored_flashblocks, state.accumulated_db.as_ref(), canonical_block_hash, + cached_state_root, ) { Ok(emission) => { // emit_final_if_pending locks self.tracer only; it does not @@ -1332,10 +1362,8 @@ where }; let all_transactions: Vec = stored.iter().flat_map(|fb| fb.diff.transactions.clone()).collect(); - let merged_index = stored - .last() - .expect("pending_state implies non-empty stored_flashblocks") - .index; + let merged_index = + stored.last().expect("pending_state implies non-empty stored_flashblocks").index; if let Err(err) = self.execute_flashblock( &assembled, merged_index, @@ -1445,6 +1473,7 @@ where flashblocks: &[Flashblock], accumulated_db: Option<&AccumulatedDb>, expected_parent_hash: B256, + precomputed_state_root: Option, ) -> Result { let assembled = BlockAssembler::assemble(flashblocks) .map_err(|err| BuildIsFinalError::AssembleFailed(format!("{err:?}")))?; @@ -1462,8 +1491,17 @@ where let assembled_header_hash_with_wire_state_root = assembled.block.header.hash_slow(); let total_transactions: usize = flashblocks.iter().map(|fb| fb.diff.transactions.len()).sum(); - let state_root = Self::compute_state_root(accumulated_db) - .map_err(|err| BuildIsFinalError::StateRootFailed(format!("{err}")))?; + // When the caller supplies a precomputed state_root (typically from the + // speculative cache that fires at index ≥ 10), skip the synchronous + // 100-150 ms trie traversal — this is the entire point of the cache, + // and it's what lets the canonical-driven is_final FIRE BLOCK win the + // race against reth's full-block FIRE BLOCK emitted from `mark_verified` + // after its own state-root validation. + let state_root = match precomputed_state_root { + Some(root) => root, + None => Self::compute_state_root(accumulated_db) + .map_err(|err| BuildIsFinalError::StateRootFailed(format!("{err}")))?, + }; let mut block = assembled.block.clone(); block.header.state_root = state_root; let recomputed_hash = block.header.hash_slow(); @@ -1673,28 +1711,29 @@ where let state = self.state.lock().expect("flashblock state mutex poisoned"); state.bundle_revision + if bundle_changed { 1 } else { 0 } }; - let state_root_result = match self - .try_speculative_state_root(block_number, projected_revision) - { - Some(root) => { - debug!( - block_number, - index, - revision = projected_revision, - state_root = %root, - "is_final: using speculatively-precomputed state_root" - ); - Ok(root) - } - None => Self::compute_state_root(Some(&*accumulated_db)), - }; + let state_root_result = + match self.try_speculative_state_root(block_number, projected_revision) { + Some(root) => { + debug!( + block_number, + index, + revision = projected_revision, + state_root = %root, + "is_final: using speculatively-precomputed state_root" + ); + Ok(root) + } + None => Self::compute_state_root(Some(&*accumulated_db)), + }; match state_root_result { Ok(state_root) => { let mut recomputed_header = assembled.block.header.clone(); recomputed_header.state_root = state_root; let recomputed_hash = recomputed_header.hash_slow(); if recomputed_hash == expected_parent_hash { - block_tracer.tracer_mut().set_final_flash_block(recomputed_hash, state_root); + block_tracer + .tracer_mut() + .set_final_flash_block(recomputed_hash, state_root); emitted_is_final = true; debug!( block = block_number, @@ -1719,9 +1758,8 @@ where } } Err(err) => { - let io_err = std::io::Error::other(format!( - "state_root computation failed: {err}" - )); + let io_err = + std::io::Error::other(format!("state_root computation failed: {err}")); warn!( block = block_number, index, @@ -1809,11 +1847,7 @@ where self.process(flashblock, false, None); } - fn on_flashblock_received_with_peek( - &self, - flashblock: Flashblock, - peek: Option<&Flashblock>, - ) { + fn on_flashblock_received_with_peek(&self, flashblock: Flashblock, peek: Option<&Flashblock>) { let (squash, is_final_expected_hash) = Self::classify_peek(&flashblock, peek); self.process(flashblock, squash, is_final_expected_hash); }