From bc360e3182270156a18589b3a9635f8cd825ac8c Mon Sep 17 00:00:00 2001 From: varun-doshi Date: Mon, 18 Aug 2025 21:17:00 +0530 Subject: [PATCH] feat: cache incoming blocks to prevent duplication --- crates/manager/src/manager/mod.rs | 19 ++++++++++++- crates/network/src/manager.rs | 47 ++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 4557ac7a..541d1d5f 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -7,6 +7,7 @@ use alloy_primitives::Signature; use alloy_provider::Provider; use futures::StreamExt; use reth_chainspec::EthChainSpec; +use reth_network::cache::LruCache; use reth_network_api::{block::NewBlockWithPeer as RethNewBlockWithPeer, FullNetwork}; use reth_scroll_node::ScrollNetworkPrimitives; use reth_scroll_primitives::ScrollBlock; @@ -23,6 +24,7 @@ use scroll_engine::{EngineDriver, EngineDriverEvent, ForkchoiceState}; use scroll_network::{ BlockImportOutcome, NetworkManagerEvent, NewBlockWithPeer, ScrollNetworkManager, }; +use scroll_wire::LRU_CACHE_SIZE; use std::{ fmt::{self, Debug, Formatter}, future::Future, @@ -189,7 +191,7 @@ where /// Returns a new event listener for the rollup node manager. pub fn event_listener(&mut self) -> EventStream { if let Some(event_sender) = &self.event_sender { - return event_sender.new_listener() + return event_sender.new_listener(); }; let event_sender = EventSender::new(EVENT_CHANNEL_SIZE); @@ -363,6 +365,21 @@ where .checked_sub(ECDSA_SIGNATURE_LEN) .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) { + let block_hash = block.hash_slow(); + if self.network.blocks_seen.contains(&(block_hash, signature)) { + return; + } else { + // Update the state of the peer cache i.e. peer has seen this block. + self.network + .scroll_wire + .state_mut() + .entry(peer_id) + .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) + .insert(block_hash); + // Update the state of the block cache i.e. we have seen this block. + self.network.blocks_seen.insert((block_hash, signature)); + } + trace!(target: "scroll::bridge::import", peer_id = %peer_id, block = ?block.hash_slow(), "Received new block from eth-wire protocol"); NewBlockWithPeer { peer_id, block, signature } } else { diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 590d87e4..e3ee7773 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -4,7 +4,7 @@ use super::{ BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent, NewBlockWithPeer, ScrollNetworkHandle, }; -use alloy_primitives::{FixedBytes, U128}; +use alloy_primitives::{FixedBytes, Signature, B256, U128}; use futures::{FutureExt, Stream, StreamExt}; use reth_eth_wire_types::NewBlock as EthWireNewBlock; use reth_network::{ @@ -42,7 +42,9 @@ pub struct ScrollNetworkManager { /// [`NetworkHandleMessage`]s. from_handle_rx: UnboundedReceiverStream, /// The scroll wire protocol manager. - scroll_wire: ScrollWireManager, + pub scroll_wire: ScrollWireManager, + /// The LRU cache used to track already seen (block,signature) pair. + pub blocks_seen: LruCache<(B256, Signature)>, } impl ScrollNetworkManager> { @@ -71,10 +73,12 @@ impl ScrollNetworkManager> { // Create the scroll-wire protocol manager. let scroll_wire = ScrollWireManager::new(events); + let blocks_seen = LruCache::new(LRU_CACHE_SIZE); + // Spawn the inner network manager. tokio::spawn(inner_network_manager); - Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire } + Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire, blocks_seen } } } @@ -92,7 +96,9 @@ impl> ScrollNetworkManager< let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle); - Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire } + let blocks_seen = LruCache::new(LRU_CACHE_SIZE); + + Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire, blocks_seen } } /// Returns a new [`ScrollNetworkHandle`] instance. @@ -134,11 +140,29 @@ impl> ScrollNetworkManager< } /// Handler for received events from the [`ScrollWireManager`]. - fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent { + fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option { match event { ScrollWireEvent::NewBlock { peer_id, block, signature } => { - trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block"); - NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature }) + let block_hash = block.hash_slow(); + trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block"); + if self.blocks_seen.contains(&(block_hash, signature)) { + return None; + } else { + // Update the state of the peer cache i.e. peer has seen this block. + self.scroll_wire + .state_mut() + .entry(peer_id) + .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) + .insert(block_hash); + // Update the state of the block cache i.e. we have seen this block. + self.blocks_seen.insert((block.hash_slow(), signature)); + + Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { + peer_id, + block, + signature, + })) + } } // Only `NewBlock` events are expected from the scroll-wire protocol. _ => { @@ -168,10 +192,11 @@ impl> ScrollNetworkManager< fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { let BlockImportOutcome { peer, result } = outcome; match result { - Ok(BlockValidation::ValidBlock { new_block: msg }) | - Ok(BlockValidation::ValidHeader { new_block: msg }) => { + Ok(BlockValidation::ValidBlock { new_block: msg }) + | Ok(BlockValidation::ValidHeader { new_block: msg }) => { trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network"); let hash = msg.block.hash_slow(); + // Update the state of the peer cache i.e. peer has seen this block. self.scroll_wire .state_mut() .entry(peer) @@ -217,7 +242,9 @@ impl> Stream for ScrollNetw // Next we handle the scroll-wire events. if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) { - return Poll::Ready(Some(this.on_scroll_wire_event(event))); + if let Some(event) = this.on_scroll_wire_event(event) { + return Poll::Ready(Some(event)); + } } Poll::Pending