Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
576b191
feat(p2p): add inbound BlocksByRange req/resp support
dicethedev May 7, 2026
cba8385
Update crates/net/p2p/src/req_resp/handlers.rs
dicethedev May 7, 2026
0130146
Merge branch 'main' into feat/blocks-by-range-inbound-support
dicethedev May 8, 2026
815814e
feat(p2p): use BlocksByRange for long-range sync
dicethedev May 8, 2026
9fc8d73
fix(clippy): use is_multiple_of for slot step check
dicethedev May 8, 2026
9028a29
Update crates/net/p2p/src/req_resp/handlers.rs
dicethedev May 8, 2026
71bc472
fix(p2p): cap BlocksByRange sync count to prevent unbounded request loop
dicethedev May 8, 2026
300a1af
fix(p2p): bound canonical_blocks_by_range traversal depth to prevent DoS
dicethedev May 8, 2026
659456e
Merge branch 'main' into feat/blocks-by-range-long-range-sync
dicethedev May 12, 2026
1e97021
Fix BlocksByRange response routing
dicethedev May 12, 2026
41bcebf
fix(p2p): address BlocksByRange review feedback
dicethedev May 15, 2026
f201a38
Merge branch 'main' into feat/blocks-by-range-long-range-sync
MegaRedHand May 19, 2026
50e85ac
fix(p2p): remove legacy step field from BlocksByRangeRequest and upda…
dicethedev May 19, 2026
3cac9a6
Merge branch 'main' into feat/blocks-by-range-long-range-sync
pablodeymo May 22, 2026
f7c9d34
Merge branch 'main' into feat/blocks-by-range-long-range-sync
pablodeymo May 27, 2026
c430e79
refactor(p2p): merge request_id_map and range_request_ids into outbou…
dicethedev May 28, 2026
641dc51
Merge branch 'main' into feat/blocks-by-range-long-range-sync
MegaRedHand Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer, request_blocks_by_range_from_peer,
},
swarm_adapter::SwarmHandle,
};
Expand All @@ -59,12 +59,22 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
pub(crate) failed_peers: HashSet<PeerId>,
}

pub(crate) enum PendingRequestKind {
Root(H256),
Range {
start_slot: u64,
end_slot: u64,
total_end_slot: u64,
},
}

// --- Swarm construction ---

/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub
Expand Down Expand Up @@ -300,8 +310,9 @@ impl P2P {
block_topic: built.block_topic,
aggregation_topic: built.aggregation_topic,
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
pending_root_requests: HashMap::new(),
outbound_requests: HashMap::new(),
pending_range_requests: HashSet::new(),
bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -336,8 +347,9 @@ pub struct P2PServer {
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,

pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) pending_root_requests: HashMap<H256, PendingRequest>,
pub(crate) outbound_requests: HashMap<OutboundRequestId, PendingRequestKind>,
pub(crate) pending_range_requests: HashSet<(u64, u64)>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand All @@ -359,6 +371,13 @@ pub(crate) trait P2PProtocol: Send + Sync {
fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>;
#[allow(dead_code)] // invoked via send_after, not called directly
fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>;
#[allow(dead_code)]
fn retry_range_sync(
&self,
start_slot: u64,
end_slot: u64,
peer_id: PeerId,
) -> Result<(), ActorError>;
}

#[actor(protocol = P2PProtocol)]
Expand All @@ -371,7 +390,7 @@ impl P2PServer {
) {
let root = msg.root;
// Check if still pending (might have succeeded during backoff)
if !self.pending_requests.contains_key(&root) {
if !self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch completed during backoff, skipping retry");
return;
}
Expand All @@ -380,7 +399,7 @@ impl P2PServer {

if !fetch_block_from_peer(self, root).await {
tracing::error!(%root, "Failed to retry block fetch, giving up");
self.pending_requests.remove(&root);
self.pending_root_requests.remove(&root);
}
}

Expand All @@ -403,6 +422,31 @@ impl P2PServer {
self.swarm_handle.dial(addr.clone());
}
}

#[send_handler]
async fn handle_retry_range_sync(
&mut self,
msg: p2p_protocol::RetryRangeSync,
_ctx: &Context<Self>,
) {
let start_slot = msg.start_slot;
let end_slot = msg.end_slot;
let peer = msg.peer_id;

// safety check: if already synced, skip retry
let still_needed = !self
.pending_range_requests
.contains(&(start_slot, end_slot));

if still_needed {
tracing::trace!(%peer, start_slot, end_slot, "Skipping retry, range already resolved");
return;
}
Comment on lines +436 to +444
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove the entry from pending_range_requests right before sending the RetryRangeSync message. That would make this always land on this skip on error.


info!(%peer, start_slot, end_slot, "Retrying BlocksByRange sync");

request_blocks_by_range_from_peer(self, peer, start_slot, end_slot).await;
}
}

// --- Manual Handler impls for network-api messages ---
Expand Down Expand Up @@ -436,7 +480,7 @@ impl Handler<FetchBlock> for P2PServer {
async fn handle(&mut self, msg: FetchBlock, _ctx: &Context<Self>) {
let root = msg.root;
// Deduplicate - if already pending, ignore
if self.pending_requests.contains_key(&root) {
if self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch already in progress, ignoring duplicate");
return;
}
Expand Down
Loading