diff --git a/Cargo.lock b/Cargo.lock index 1cb6b50f..0490655e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2118,6 +2118,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "vergen-git2", ] [[package]] diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index d766b6a8..5663ec93 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -44,6 +44,7 @@ libssz-merkle.workspace = true libssz-types.workspace = true sha2 = "0.10" - -[dev-dependencies] hex.workspace = true + +[build-dependencies] +vergen-git2.workspace = true diff --git a/crates/net/p2p/build.rs b/crates/net/p2p/build.rs new file mode 100644 index 00000000..b0af9402 --- /dev/null +++ b/crates/net/p2p/build.rs @@ -0,0 +1,58 @@ +use std::{env, fs, path::PathBuf}; + +use vergen_git2::{Emitter, Git2Builder}; + +fn main() -> Result<(), Box> { + // Embed the build's short git SHA (consumed via env!("VERGEN_GIT_SHA")) so + // publish-side gossip diagnostics can report which client build emitted a + // message. Mirrors bin/ethlambda/build.rs. + let git2 = Git2Builder::default().sha(true).build()?; + Emitter::default().add_instructions(&git2)?.emit()?; + + // Surface the resolved `snap` crate version so the same diagnostics can + // record which snappy implementation produced the compressed payload. The + // version is not available via any standard Cargo env var, so we read it + // out of the workspace lockfile. + let snap_version = snap_version().unwrap_or_else(|| "unknown".to_string()); + println!("cargo:rustc-env=SNAP_VERSION={snap_version}"); + + Ok(()) +} + +/// Parse the `snap` package version out of the workspace `Cargo.lock`. +/// +/// Walks up from this crate's manifest dir until a `Cargo.lock` is found, then +/// scans for the `snap` package entry. Returns `None` if the lockfile or entry +/// is missing (the caller falls back to `"unknown"`). +fn snap_version() -> Option { + let mut dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").ok()?); + let lockfile = loop { + let candidate = dir.join("Cargo.lock"); + if candidate.is_file() { + break candidate; + } + if !dir.pop() { + return None; + } + }; + println!("cargo:rerun-if-changed={}", lockfile.display()); + + let contents = fs::read_to_string(&lockfile).ok()?; + let mut lines = contents.lines(); + while let Some(line) = lines.next() { + if line.trim() != "name = \"snap\"" { + continue; + } + // Within the same `[[package]]` block, find the version line. + for next in lines.by_ref() { + let next = next.trim(); + if next.starts_with("[[package]]") { + break; + } + if let Some(version) = next.strip_prefix("version = \"") { + return Some(version.trim_end_matches('"').to_string()); + } + } + } + None +} diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index c257006b..408d9d79 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -6,6 +6,7 @@ use ethlambda_types::{ }; use libp2p::gossipsub::Event; use libssz::{SszDecode, SszEncode}; +use sha2::{Digest as _, Sha256}; use tracing::{error, info, trace}; use super::{ @@ -15,11 +16,56 @@ use super::{ attestation_subnet_topic, }, }; -use crate::{P2PServer, metrics}; +use crate::{P2PServer, gossip_message_id, metrics}; + +/// Short git SHA of this build, embedded by `build.rs`. Logged with publish-side +/// gossip diagnostics so a captured message can be traced to the emitting build. +const CLIENT_GIT_SHA: &str = env!("VERGEN_GIT_SHA"); + +/// Snappy implementation and resolved version (the Rust `snap` crate), embedded +/// by `build.rs`. Logged so cross-client byte comparisons can attribute the +/// compressed output to a specific snappy library. +const SNAPPY_LIB_VERSION: &str = concat!("rust-snap/", env!("SNAP_VERSION")); + +/// Pre-publish diagnostics for a gossipsub message, capturing the exact bytes a +/// node is about to put on the wire. Used to debug cross-client snappy/SSZ +/// corruption (e.g. blockblaz/zeam#942): comparing these fields against what a +/// peer logs on receipt pinpoints whether divergence is at the compression, +/// transport, or decode stage. +struct PublishDiagnostics { + /// Lowercase hex SHA256 of the uncompressed SSZ payload. + ssz_sha256: String, + /// Lowercase hex SHA256 of the snappy-compressed payload (the on-wire bytes). + compressed_sha256: String, + /// Length in bytes of the compressed payload. + compressed_len: usize, + /// Whether decompressing our own output round-trips back to the SSZ bytes. + /// `false` signals a local snappy encoder bug before the message ever leaves. + snappy_self_decode_ok: bool, + /// Lowercase hex gossipsub message ID, computed identically to the receive + /// path so it matches the ID peers will assign. + message_id: String, +} + +impl PublishDiagnostics { + /// Compute diagnostics for `topic` from the uncompressed `ssz` and its + /// `compressed` (on-wire) form. + fn new(topic: &str, ssz: &[u8], compressed: &[u8]) -> Self { + let snappy_self_decode_ok = + decompress_message(compressed).is_ok_and(|decoded| decoded == ssz); + Self { + ssz_sha256: hex::encode(Sha256::digest(ssz)), + compressed_sha256: hex::encode(Sha256::digest(compressed)), + compressed_len: compressed.len(), + snappy_self_decode_ok, + message_id: hex::encode(gossip_message_id(topic, compressed)), + } + } +} pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { - propagation_source: _, + propagation_source, message_id: _, message, } = event @@ -32,9 +78,9 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { Some(BLOCK_TOPIC_KIND) => { info!(kind = "block", peer_count, "P2P message received"); let compressed_len = message.data.len(); - let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) - else { + let Ok(uncompressed_data) = decompress_message(&message.data).inspect_err( + |err| error!(%err, %propagation_source, "Failed to decompress gossipped block"), + ) else { return; }; metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len); @@ -67,7 +113,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { info!(kind = "aggregation", peer_count, "P2P message received"); let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) + .inspect_err(|err| error!(%err, %propagation_source, "Failed to decompress gossipped aggregation")) else { return; }; @@ -99,7 +145,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { info!(kind = "attestation", peer_count, "P2P message received"); let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) - .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) + .inspect_err(|err| error!(%err, %propagation_source, "Failed to decompress gossipped attestation")) else { return; }; @@ -154,6 +200,22 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte .cloned() .unwrap_or_else(|| attestation_subnet_topic(subnet_id)); + let topic_hash = topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + validator, + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing attestation to gossipsub (publish diagnostics)" + ); + server.swarm_handle.publish(topic, compressed); info!( %slot, @@ -182,6 +244,23 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + let topic_hash = server.block_topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + proposer, + block_root = %hex::encode(block_root.0), + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing block to gossipsub (publish diagnostics)" + ); + // Publish to gossipsub server .swarm_handle @@ -210,6 +289,21 @@ pub async fn publish_aggregated_attestation( metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + let topic_hash = server.aggregation_topic.hash(); + let diagnostics = PublishDiagnostics::new(topic_hash.as_str(), &ssz_bytes, &compressed); + info!( + topic = %topic_hash, + %slot, + ssz_sha256 = %diagnostics.ssz_sha256, + compressed_sha256 = %diagnostics.compressed_sha256, + compressed_len = diagnostics.compressed_len, + snappy_self_decode_ok = diagnostics.snappy_self_decode_ok, + message_id = %diagnostics.message_id, + git_sha = CLIENT_GIT_SHA, + snappy = SNAPPY_LIB_VERSION, + "Publishing aggregated attestation to gossipsub (publish diagnostics)" + ); + // Publish to the aggregation topic server .swarm_handle diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f1fa5fe4..11261ce6 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -694,25 +694,37 @@ fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str } } -fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { +/// Compute the gossipsub message ID for a topic and its on-wire (snappy) data. +/// +/// The ID is the 20-byte truncated SHA256 of `domain || topic_len_le || topic +/// || payload`, where `payload` is the snappy-decompressed bytes when `data` +/// is valid snappy (domain `0x01000000`) and the raw `data` otherwise (domain +/// `0x00000000`). Shared by the gossipsub `message_id_fn` and the publish-side +/// diagnostics so the logged ID matches the one peers will assign. +pub(crate) fn gossip_message_id(topic: &str, data: &[u8]) -> [u8; 20] { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; - let mut hasher = sha2::Sha256::new(); - let decompressed = gossipsub::decompress_message(&message.data).ok(); - - let (domain, data) = match decompressed.as_deref() { - Some(data) => (MESSAGE_DOMAIN_VALID_SNAPPY, data), - None => (MESSAGE_DOMAIN_INVALID_SNAPPY, message.data.as_slice()), + let decompressed = gossipsub::decompress_message(data).ok(); + let (domain, payload) = match decompressed.as_deref() { + Some(payload) => (MESSAGE_DOMAIN_VALID_SNAPPY, payload), + None => (MESSAGE_DOMAIN_INVALID_SNAPPY, data), }; - let topic = message.topic.as_str().as_bytes(); - let topic_len = (topic.len() as u64).to_le_bytes(); + + let mut hasher = sha2::Sha256::new(); hasher.update(domain); - hasher.update(topic_len); - hasher.update(topic); - hasher.update(data); + hasher.update((topic.len() as u64).to_le_bytes()); + hasher.update(topic.as_bytes()); + hasher.update(payload); let hash = hasher.finalize(); - libp2p::gossipsub::MessageId(hash[..20].to_vec()) + + let mut id = [0u8; 20]; + id.copy_from_slice(&hash[..20]); + id +} + +fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { + libp2p::gossipsub::MessageId(gossip_message_id(message.topic.as_str(), &message.data).to_vec()) } #[cfg(test)]