Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use alloy_primitives::Signature;
use alloy_provider::Provider;
use futures::StreamExt;
use reth_chainspec::EthChainSpec;
use reth_network::cache::LruCache;
use reth_network_api::{block::NewBlockWithPeer as RethNewBlockWithPeer, FullNetwork};
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_scroll_primitives::ScrollBlock;
Expand All @@ -23,6 +24,7 @@ use scroll_engine::{EngineDriver, EngineDriverEvent, ForkchoiceState};
use scroll_network::{
BlockImportOutcome, NetworkManagerEvent, NewBlockWithPeer, ScrollNetworkManager,
};
use scroll_wire::LRU_CACHE_SIZE;
use std::{
fmt::{self, Debug, Formatter},
future::Future,
Expand Down Expand Up @@ -189,7 +191,7 @@ where
/// Returns a new event listener for the rollup node manager.
pub fn event_listener(&mut self) -> EventStream<RollupManagerEvent> {
if let Some(event_sender) = &self.event_sender {
return event_sender.new_listener()
return event_sender.new_listener();
};

let event_sender = EventSender::new(EVENT_CHANNEL_SIZE);
Expand Down Expand Up @@ -363,6 +365,21 @@ where
.checked_sub(ECDSA_SIGNATURE_LEN)
.and_then(|i| Signature::from_raw(&extra_data[i..]).ok())
{
let block_hash = block.hash_slow();
if self.network.blocks_seen.contains(&(block_hash, signature)) {
return;
} else {
// Update the state of the peer cache i.e. peer has seen this block.
self.network
.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
// Update the state of the block cache i.e. we have seen this block.
self.network.blocks_seen.insert((block_hash, signature));
}

trace!(target: "scroll::bridge::import", peer_id = %peer_id, block = ?block.hash_slow(), "Received new block from eth-wire protocol");
NewBlockWithPeer { peer_id, block, signature }
} else {
Expand Down
47 changes: 37 additions & 10 deletions crates/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent,
NewBlockWithPeer, ScrollNetworkHandle,
};
use alloy_primitives::{FixedBytes, U128};
use alloy_primitives::{FixedBytes, Signature, B256, U128};
use futures::{FutureExt, Stream, StreamExt};
use reth_eth_wire_types::NewBlock as EthWireNewBlock;
use reth_network::{
Expand Down Expand Up @@ -42,7 +42,9 @@ pub struct ScrollNetworkManager<N> {
/// [`NetworkHandleMessage`]s.
from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage>,
/// The scroll wire protocol manager.
scroll_wire: ScrollWireManager,
pub scroll_wire: ScrollWireManager,
/// The LRU cache used to track already seen (block,signature) pair.
pub blocks_seen: LruCache<(B256, Signature)>,
}

impl ScrollNetworkManager<RethNetworkHandle<ScrollNetworkPrimitives>> {
Expand Down Expand Up @@ -71,10 +73,12 @@ impl ScrollNetworkManager<RethNetworkHandle<ScrollNetworkPrimitives>> {
// Create the scroll-wire protocol manager.
let scroll_wire = ScrollWireManager::new(events);

let blocks_seen = LruCache::new(LRU_CACHE_SIZE);

// Spawn the inner network manager.
tokio::spawn(inner_network_manager);

Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire }
Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire, blocks_seen }
}
}

Expand All @@ -92,7 +96,9 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ScrollNetworkManager<

let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle);

Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire }
let blocks_seen = LruCache::new(LRU_CACHE_SIZE);

Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire, blocks_seen }
}

/// Returns a new [`ScrollNetworkHandle`] instance.
Expand Down Expand Up @@ -134,11 +140,29 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ScrollNetworkManager<
}

/// Handler for received events from the [`ScrollWireManager`].
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> NetworkManagerEvent {
fn on_scroll_wire_event(&mut self, event: ScrollWireEvent) -> Option<NetworkManagerEvent> {
match event {
ScrollWireEvent::NewBlock { peer_id, block, signature } => {
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block.hash_slow(), signature = ?signature, "Received new block");
NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })
let block_hash = block.hash_slow();
trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block");
if self.blocks_seen.contains(&(block_hash, signature)) {
return None;
} else {
// Update the state of the peer cache i.e. peer has seen this block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk marking a peer as having seen this block at this stage @frisitano?

self.scroll_wire
.state_mut()
.entry(peer_id)
.or_insert_with(|| LruCache::new(LRU_CACHE_SIZE))
.insert(block_hash);
// Update the state of the block cache i.e. we have seen this block.
self.blocks_seen.insert((block.hash_slow(), signature));

Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer {
peer_id,
block,
signature,
}))
}
}
// Only `NewBlock` events are expected from the scroll-wire protocol.
_ => {
Expand Down Expand Up @@ -168,10 +192,11 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ScrollNetworkManager<
fn on_block_import_result(&mut self, outcome: BlockImportOutcome) {
let BlockImportOutcome { peer, result } = outcome;
match result {
Ok(BlockValidation::ValidBlock { new_block: msg }) |
Ok(BlockValidation::ValidHeader { new_block: msg }) => {
Ok(BlockValidation::ValidBlock { new_block: msg })
| Ok(BlockValidation::ValidHeader { new_block: msg }) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to run the linter with rust nightly to accomodate to our linting rules.

trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network");
let hash = msg.block.hash_slow();
// Update the state of the peer cache i.e. peer has seen this block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this here? If we marked the peer already just after we received the block this shouldn't be necessary? @frisitano to confirm

self.scroll_wire
.state_mut()
.entry(peer)
Expand Down Expand Up @@ -217,7 +242,9 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> Stream for ScrollNetw

// Next we handle the scroll-wire events.
if let Poll::Ready(event) = this.scroll_wire.poll_unpin(cx) {
return Poll::Ready(Some(this.on_scroll_wire_event(event)));
if let Some(event) = this.on_scroll_wire_event(event) {
return Poll::Ready(Some(event));
}
}

Poll::Pending
Expand Down
Loading