From 4f6fde66e578900752334473b0bb0f948fdbd32f Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Tue, 22 Jul 2025 20:13:19 +0800 Subject: [PATCH 01/10] Add NetworkHandle to RollupManagerCommand --- crates/manager/src/manager/command.rs | 8 +++++++- crates/manager/src/manager/handle.rs | 19 ++++++++++++++----- crates/manager/src/manager/mod.rs | 8 ++++++-- crates/node/src/add_ons/handle.rs | 8 +++++--- crates/node/src/add_ons/rollup.rs | 2 +- crates/node/src/args.rs | 2 +- 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/crates/manager/src/manager/command.rs b/crates/manager/src/manager/command.rs index 2b42fc8c..2ecfa33c 100644 --- a/crates/manager/src/manager/command.rs +++ b/crates/manager/src/manager/command.rs @@ -2,14 +2,20 @@ use super::{RollupManagerEvent, RollupManagerStatus}; use reth_tokio_util::EventStream; use tokio::sync::oneshot; +use scroll_network::ScrollNetworkHandle; +use reth_network_api::FullNetwork; +use reth_scroll_node::ScrollNetworkPrimitives; + /// The commands that can be sent to the rollup manager. #[derive(Debug)] -pub enum RollupManagerCommand { +pub enum RollupManagerCommand> { /// Command to build a new block. BuildBlock, /// Returns an event stream for rollup manager events. EventListener(oneshot::Sender>), /// Report the current status of the manager via the oneshot channel. Status(oneshot::Sender), + + NetworkHandle(oneshot::Sender>), } diff --git a/crates/manager/src/manager/handle.rs b/crates/manager/src/manager/handle.rs index 0651add0..1f883fe6 100644 --- a/crates/manager/src/manager/handle.rs +++ b/crates/manager/src/manager/handle.rs @@ -1,22 +1,25 @@ +use reth_network_api::FullNetwork; +use reth_scroll_node::ScrollNetworkPrimitives; use super::{RollupManagerCommand, RollupManagerEvent}; use reth_tokio_util::EventStream; use tokio::sync::{mpsc, oneshot}; +use scroll_network::ScrollNetworkHandle; /// The handle used to send commands to the rollup manager. #[derive(Debug, Clone)] -pub struct RollupManagerHandle { +pub struct RollupManagerHandle> { /// The channel used to send commands to the rollup manager. - to_manager_tx: mpsc::Sender, + to_manager_tx: mpsc::Sender>, } -impl RollupManagerHandle { +impl > RollupManagerHandle { /// Create a new rollup manager handle. - pub const fn new(to_manager_tx: mpsc::Sender) -> Self { + pub const fn new(to_manager_tx: mpsc::Sender>) -> Self { Self { to_manager_tx } } /// Sends a command to the rollup manager. - pub async fn send_command(&self, command: RollupManagerCommand) { + pub async fn send_command(&self, command: RollupManagerCommand) { let _ = self.to_manager_tx.send(command).await; } @@ -25,6 +28,12 @@ impl RollupManagerHandle { self.send_command(RollupManagerCommand::BuildBlock).await; } + pub async fn get_network_handle(&self) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + self.send_command(RollupManagerCommand::NetworkHandle(tx)).await; + rx.await + } + /// Sends a command to the rollup manager to fetch an event listener for the rollup node /// manager. pub async fn get_event_listener( diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index ddd301e3..e30ca32f 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -78,7 +78,7 @@ pub struct RollupNodeManager< CS, > { /// The handle receiver used to receive commands. - handle_rx: Receiver, + handle_rx: Receiver>, /// The chain spec used by the rollup node. chain_spec: Arc, /// The network manager that manages the scroll p2p network. @@ -164,7 +164,7 @@ where sequencer: Option>, signer: Option, block_time: Option, - ) -> (Self, RollupManagerHandle) { + ) -> (Self, RollupManagerHandle) { let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE); let indexer = Indexer::new(database.clone(), chain_spec.clone()); let derivation_pipeline = DerivationPipeline::new(l1_provider, database); @@ -450,6 +450,10 @@ where RollupManagerCommand::Status(tx) => { tx.send(this.status()).expect("Failed to send status to handle"); } + RollupManagerCommand::NetworkHandle(tx) => { + let network_handle = this.network.handle(); + tx.send(network_handle.clone()).expect("Failed to send network handle to handle"); + } } } diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index e5a37480..6967c0e9 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -1,15 +1,17 @@ +use reth_network_api::FullNetwork; use reth_node_api::FullNodeComponents; use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider}; use reth_rpc_eth_api::EthApiTypes; +use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_manager::RollupManagerHandle; #[cfg(feature = "test-utils")] use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender}; /// A handle for scroll addons, which includes handles for the rollup manager and RPC server. #[derive(Debug, Clone)] -pub struct ScrollAddOnsHandle { +pub struct ScrollAddOnsHandle>, EthApi: EthApiTypes> { /// The handle used to send commands to the rollup manager. - pub rollup_manager_handle: RollupManagerHandle, + pub rollup_manager_handle: RollupManagerHandle, /// The handle used to send commands to the RPC server. pub rpc_handle: RpcHandle, /// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`. @@ -17,7 +19,7 @@ pub struct ScrollAddOnsHandle { pub l1_watcher_tx: Option>>, } -impl RpcHandleProvider +impl>, EthApi: EthApiTypes> RpcHandleProvider for ScrollAddOnsHandle { fn rpc_handle(&self) -> &RpcHandle { diff --git a/crates/node/src/add_ons/rollup.rs b/crates/node/src/add_ons/rollup.rs index aed820ee..0c49654c 100644 --- a/crates/node/src/add_ons/rollup.rs +++ b/crates/node/src/add_ons/rollup.rs @@ -50,7 +50,7 @@ impl RollupManagerAddOn { self, ctx: AddOnsContext<'_, N>, rpc: RpcHandle, - ) -> eyre::Result<(RollupManagerHandle, Option>>)> + ) -> eyre::Result<(RollupManagerHandle, Option>>)> where <::Types as NodeTypes>::ChainSpec: ScrollHardforks + IsDevChain, N::Network: NetworkProtocols + FullNetwork, diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 981f4e2e..b223b140 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -113,7 +113,7 @@ impl ScrollRollupNodeConfig { impl L1MessageProvider, impl ScrollHardforks + EthChainSpec + IsDevChain + Clone + 'static, >, - RollupManagerHandle, + RollupManagerHandle, Option>>, )> { // Instantiate the network manager From a6c531ed62d53e22186384ceee15f19cf5fb296e Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 1 Aug 2025 10:33:37 +0800 Subject: [PATCH 02/10] finish can_penalize_peer_for_invalid_block test --- crates/engine/src/driver.rs | 6 +- crates/engine/src/future/result.rs | 1 + crates/manager/src/manager/command.rs | 9 ++- crates/manager/src/manager/handle.rs | 13 ++-- crates/manager/src/manager/mod.rs | 5 +- crates/network/src/manager.rs | 4 +- crates/node/tests/e2e.rs | 98 ++++++++++++++++++++++++++- 7 files changed, 116 insertions(+), 20 deletions(-) diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 73d97833..e18ef510 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -186,7 +186,7 @@ where self.metrics.block_import_duration.record(duration.as_secs_f64()); // Return the block import outcome - return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome) + return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to import block"); @@ -213,7 +213,7 @@ where // record the metric. self.metrics.l1_consolidation_duration.record(duration.as_secs_f64()); - return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)) + return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1") @@ -234,7 +234,7 @@ where self.metrics.build_new_payload_duration.record(duration.as_secs_f64()); self.metrics.gas_per_block.record(block.gas_used as f64); - return Some(EngineDriverEvent::NewPayload(block)) + return Some(EngineDriverEvent::NewPayload(block)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to build new payload"); diff --git a/crates/engine/src/future/result.rs b/crates/engine/src/future/result.rs index ffd113ba..8647a56d 100644 --- a/crates/engine/src/future/result.rs +++ b/crates/engine/src/future/result.rs @@ -1,5 +1,6 @@ use super::*; +#[derive(Debug)] /// A type that represents the result of the engine driver future. pub(crate) enum EngineDriverFutureResult { BlockImport( diff --git a/crates/manager/src/manager/command.rs b/crates/manager/src/manager/command.rs index 2ecfa33c..353107fe 100644 --- a/crates/manager/src/manager/command.rs +++ b/crates/manager/src/manager/command.rs @@ -1,11 +1,10 @@ use super::{RollupManagerEvent, RollupManagerStatus}; -use reth_tokio_util::EventStream; -use tokio::sync::oneshot; -use scroll_network::ScrollNetworkHandle; use reth_network_api::FullNetwork; use reth_scroll_node::ScrollNetworkPrimitives; - +use reth_tokio_util::EventStream; +use scroll_network::ScrollNetworkHandle; +use tokio::sync::oneshot; /// The commands that can be sent to the rollup manager. #[derive(Debug)] @@ -16,6 +15,6 @@ pub enum RollupManagerCommand>), /// Report the current status of the manager via the oneshot channel. Status(oneshot::Sender), - + /// Returns the network handle. NetworkHandle(oneshot::Sender>), } diff --git a/crates/manager/src/manager/handle.rs b/crates/manager/src/manager/handle.rs index 1f883fe6..bc578c04 100644 --- a/crates/manager/src/manager/handle.rs +++ b/crates/manager/src/manager/handle.rs @@ -1,18 +1,18 @@ +use super::{RollupManagerCommand, RollupManagerEvent}; use reth_network_api::FullNetwork; use reth_scroll_node::ScrollNetworkPrimitives; -use super::{RollupManagerCommand, RollupManagerEvent}; use reth_tokio_util::EventStream; -use tokio::sync::{mpsc, oneshot}; use scroll_network::ScrollNetworkHandle; +use tokio::sync::{mpsc, oneshot}; /// The handle used to send commands to the rollup manager. #[derive(Debug, Clone)] -pub struct RollupManagerHandle> { +pub struct RollupManagerHandle> { /// The channel used to send commands to the rollup manager. to_manager_tx: mpsc::Sender>, } -impl > RollupManagerHandle { +impl> RollupManagerHandle { /// Create a new rollup manager handle. pub const fn new(to_manager_tx: mpsc::Sender>) -> Self { Self { to_manager_tx } @@ -28,7 +28,10 @@ impl > RollupManagerHandle Result, oneshot::error::RecvError> { + /// Sends a command to the rollup manager to get the network handle. + pub async fn get_network_handle( + &self, + ) -> Result, oneshot::error::RecvError> { let (tx, rx) = oneshot::channel(); self.send_command(RollupManagerCommand::NetworkHandle(tx)).await; rx.await diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index e30ca32f..559e51e2 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -189,7 +189,7 @@ where /// Returns a new event listener for the rollup node manager. pub fn event_listener(&mut self) -> EventStream { if let Some(event_sender) = &self.event_sender { - return event_sender.new_listener() + return event_sender.new_listener(); }; let event_sender = EventSender::new(EVENT_CHANNEL_SIZE); @@ -452,7 +452,8 @@ where } RollupManagerCommand::NetworkHandle(tx) => { let network_handle = this.network.handle(); - tx.send(network_handle.clone()).expect("Failed to send network handle to handle"); + tx.send(network_handle.clone()) + .expect("Failed to send network handle to handle"); } } } diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 7c10ceb5..c3eb140f 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -159,8 +159,8 @@ impl ScrollNetworkManager { fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { let BlockImportOutcome { peer, result } = outcome; match result { - Ok(BlockValidation::ValidBlock { new_block: msg }) | - Ok(BlockValidation::ValidHeader { new_block: msg }) => { + Ok(BlockValidation::ValidBlock { new_block: msg }) + | Ok(BlockValidation::ValidHeader { new_block: msg }) => { trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network"); let hash = msg.block.hash_slow(); self.scroll_wire diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 80d6aa9f..ce462bc2 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -3,11 +3,12 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; use futures::StreamExt; -use reth_network::{NetworkConfigBuilder, PeersInfo}; +use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; use reth_tokio_util::EventStream; +use reth_scroll_primitives::ScrollBlock; use rollup_node::{ test_utils::{ default_sequencer_test_scroll_rollup_node_config, default_test_scroll_rollup_node_config, @@ -26,6 +27,15 @@ use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; use std::{path::PathBuf, sync::Arc}; use tokio::sync::{oneshot, Mutex}; +use scroll_wire::ScrollWireConfig; +use std::{ + path::PathBuf, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::sync::Mutex; +use tokio::time; use tracing::trace; #[tokio::test] @@ -57,7 +67,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> { let (mut nodes, _tasks, _wallet) = setup_engine(node_args, 1, chain_spec, false, false).await?; let node = nodes.pop().unwrap(); - let rnm_handle: RollupManagerHandle = node.inner.add_ons_handle.rollup_manager_handle.clone(); + let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); let mut rnm_events = rnm_handle.get_event_listener().await?; let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); @@ -164,6 +174,88 @@ async fn can_sequence_and_gossip_blocks() { } } +#[tokio::test] +async fn can_penalize_peer_for_invalid_block() { + reth_tracing::init_test_tracing(); + + // create 2 nodes + let chain_spec = (*SCROLL_DEV).clone(); + let rollup_manager_args = ScrollRollupNodeConfig { + test: true, + network_args: ScrollNetworkArgs { + enable_eth_scroll_wire_bridge: true, + enable_scroll_wire: true, + }, + database_args: DatabaseArgs { path: Some(PathBuf::from("sqlite::memory:")) }, + l1_provider_args: L1ProviderArgs::default(), + engine_driver_args: EngineDriverArgs::default(), + sequencer_args: SequencerArgs { + sequencer_enabled: true, + block_time: 0, + max_l1_messages_per_block: 4, + l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0), + payload_building_duration: 1000, + ..SequencerArgs::default() + }, + beacon_provider_args: BeaconProviderArgs::default(), + signer_args: Default::default(), + }; + + let (nodes, _tasks, _) = setup_engine(rollup_manager_args, 2, chain_spec, false).await.unwrap(); + + let node0_rmn_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); + let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap(); + let node0_id = node0_network_handle.inner().peer_id(); + + let node1_rnm_handle = nodes[1].inner.add_ons_handle.rollup_manager_handle.clone(); + let node1_network_handle = node1_rnm_handle.get_network_handle().await.unwrap(); + + // get initial reputation of node0 from pov of node1 + let initial_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + assert_eq!(initial_reputation, 0); + + // create invalid block + let block = ScrollBlock::default(); + + // send invalid block from node0 to node1. We don't care about the signature here since we use a NoopConsensus in the test. + node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false)); + + eventually( + Duration::from_secs(100), + Duration::from_millis(10000), + "Peer0 reputation should be lower after sending invalid block", + || async { + // check that the node0 is penalized on node1 + let slashed_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + slashed_reputation < initial_reputation + }, + ) + .await; +} + +/// Helper function to wait until a predicate is true or a timeout occurs. +pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let mut interval = time::interval(tick); + let start = time::Instant::now(); + loop { + if predicate().await { + return; + } + + if start.elapsed() > timeout { + panic!("Timeout while waiting for condition: {}", message); + } + + interval.tick().await; + } +} + #[allow(clippy::large_stack_frames)] #[tokio::test] async fn can_sequence_and_gossip_transactions() { @@ -529,7 +621,7 @@ async fn graceful_shutdown_consolidates_most_recent_batch_on_startup() -> eyre:: }; if block_info.number == 4 { - break + break; }; i += 1; } From a953318b0e95020d77f9b5e50373eee16e4cdedc Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 1 Aug 2025 11:24:40 +0800 Subject: [PATCH 03/10] fix test after merge --- crates/node/tests/e2e.rs | 44 +++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index ce462bc2..10a29e8d 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -7,8 +7,8 @@ use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; -use reth_tokio_util::EventStream; use reth_scroll_primitives::ScrollBlock; +use reth_tokio_util::EventStream; use rollup_node::{ test_utils::{ default_sequencer_test_scroll_rollup_node_config, default_test_scroll_rollup_node_config, @@ -17,7 +17,7 @@ use rollup_node::{ BeaconProviderArgs, DatabaseArgs, EngineDriverArgs, GasPriceOracleArgs, L1ProviderArgs, NetworkArgs as ScrollNetworkArgs, ScrollRollupNodeConfig, SequencerArgs, }; -use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent, RollupManagerHandle}; +use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent}; use rollup_node_primitives::BatchCommitData; use rollup_node_providers::BlobSource; use rollup_node_sequencer::L1MessageInclusionMode; @@ -25,16 +25,8 @@ use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::{oneshot, Mutex}; -use scroll_wire::ScrollWireConfig; -use std::{ - path::PathBuf, - sync::Arc, - task::{Context, Poll}, - time::Duration, -}; -use tokio::sync::Mutex; use tokio::time; use tracing::trace; @@ -185,6 +177,7 @@ async fn can_penalize_peer_for_invalid_block() { network_args: ScrollNetworkArgs { enable_eth_scroll_wire_bridge: true, enable_scroll_wire: true, + sequencer_url: None, }, database_args: DatabaseArgs { path: Some(PathBuf::from("sqlite::memory:")) }, l1_provider_args: L1ProviderArgs::default(), @@ -197,11 +190,16 @@ async fn can_penalize_peer_for_invalid_block() { payload_building_duration: 1000, ..SequencerArgs::default() }, - beacon_provider_args: BeaconProviderArgs::default(), + beacon_provider_args: BeaconProviderArgs { + blob_source: BlobSource::Mock, + ..Default::default() + }, signer_args: Default::default(), + gas_price_oracle_args: GasPriceOracleArgs::default(), }; - let (nodes, _tasks, _) = setup_engine(rollup_manager_args, 2, chain_spec, false).await.unwrap(); + let (nodes, _tasks, _) = + setup_engine(rollup_manager_args, 2, chain_spec, false, false).await.unwrap(); let node0_rmn_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap(); @@ -322,7 +320,7 @@ async fn can_sequence_and_gossip_transactions() { if let RollupManagerEvent::BlockSequenced(block) = e { assert_eq!(block.header.number, 2); assert_eq!(block.body.transactions.len(), 1); - return true + return true; } false }, @@ -415,7 +413,7 @@ async fn can_forward_tx_to_sequencer() { if let RollupManagerEvent::BlockSequenced(block) = e { assert_eq!(block.header.number, 2); assert_eq!(block.body.transactions.len(), 1); - return true + return true; } false }, @@ -820,7 +818,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { rnm_events.next().await { if consolidation_outcome.block_info().block_info.number == 4 { - break + break; } } } @@ -834,7 +832,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { rnm_events.next().await { if consolidation_outcome.block_info().block_info.number == 46 { - break + break; } } } @@ -928,7 +926,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { handle.build_block().await; if let Some(RollupManagerEvent::BlockSequenced(_)) = rnm_events.next().await { if i == 10 { - break + break; } i += 1; } @@ -941,7 +939,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::L1MessageIndexed(index)) = rnm_events.next().await { assert_eq!(index, 0); - break + break; } } l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; @@ -953,7 +951,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { if block.body.transactions.iter().any(|tx| tx.is_l1_message()) { l2_reorged_height = block.header.number; - break + break; } } } @@ -963,7 +961,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::Reorg(height)) = rnm_events.next().await { assert_eq!(height, 9); - break + break; } } @@ -972,7 +970,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { assert_eq!(block.number, l2_reorged_height); - break + break; } } @@ -996,7 +994,7 @@ async fn wait_n_events( n -= 1; } if n == 0 { - break + break; } } } From 319e7a9cf6e1f43fb752711033b2895de3b754c2 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 1 Aug 2025 11:40:49 +0800 Subject: [PATCH 04/10] fix linter --- crates/node/src/add_ons/handle.rs | 11 ++++++++--- crates/node/tests/e2e.rs | 13 +++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index 6967c0e9..757e7931 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -9,7 +9,10 @@ use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sen /// A handle for scroll addons, which includes handles for the rollup manager and RPC server. #[derive(Debug, Clone)] -pub struct ScrollAddOnsHandle>, EthApi: EthApiTypes> { +pub struct ScrollAddOnsHandle< + Node: FullNodeComponents>, + EthApi: EthApiTypes, +> { /// The handle used to send commands to the rollup manager. pub rollup_manager_handle: RollupManagerHandle, /// The handle used to send commands to the RPC server. @@ -19,8 +22,10 @@ pub struct ScrollAddOnsHandle>>, } -impl>, EthApi: EthApiTypes> RpcHandleProvider - for ScrollAddOnsHandle +impl< + Node: FullNodeComponents>, + EthApi: EthApiTypes, + > RpcHandleProvider for ScrollAddOnsHandle { fn rpc_handle(&self) -> &RpcHandle { &self.rpc_handle diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 10a29e8d..e444a2c3 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -26,8 +26,10 @@ use scroll_alloy_consensus::TxL1Message; use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::sync::{oneshot, Mutex}; -use tokio::time; +use tokio::{ + sync::{oneshot, Mutex}, + time, +}; use tracing::trace; #[tokio::test] @@ -216,7 +218,8 @@ async fn can_penalize_peer_for_invalid_block() { // create invalid block let block = ScrollBlock::default(); - // send invalid block from node0 to node1. We don't care about the signature here since we use a NoopConsensus in the test. + // send invalid block from node0 to node1. We don't care about the signature here since we use a + // NoopConsensus in the test. node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false)); eventually( @@ -246,9 +249,7 @@ where return; } - if start.elapsed() > timeout { - panic!("Timeout while waiting for condition: {}", message); - } + assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); interval.tick().await; } From d3d743b0701a0992e1281c07e42f647ee53f1d1d Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 1 Aug 2025 12:11:37 +0800 Subject: [PATCH 05/10] fix linter --- crates/network/src/manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index c3eb140f..7c10ceb5 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -159,8 +159,8 @@ impl ScrollNetworkManager { fn on_block_import_result(&mut self, outcome: BlockImportOutcome) { let BlockImportOutcome { peer, result } = outcome; match result { - Ok(BlockValidation::ValidBlock { new_block: msg }) - | Ok(BlockValidation::ValidHeader { new_block: msg }) => { + Ok(BlockValidation::ValidBlock { new_block: msg }) | + Ok(BlockValidation::ValidHeader { new_block: msg }) => { trace!(target: "scroll::network::manager", peer_id = ?peer, block = ?msg.block, "Block import successful - announcing block to network"); let hash = msg.block.hash_slow(); self.scroll_wire From 320770d1e88960fc70b2be655ba1e0e48f7157e1 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:45:30 +0800 Subject: [PATCH 06/10] remove unrelated formatting changes --- crates/engine/src/driver.rs | 6 +++--- crates/manager/src/manager/mod.rs | 2 +- crates/node/tests/e2e.rs | 22 +++++++++++----------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index e18ef510..73d97833 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -186,7 +186,7 @@ where self.metrics.block_import_duration.record(duration.as_secs_f64()); // Return the block import outcome - return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome); + return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome) } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to import block"); @@ -213,7 +213,7 @@ where // record the metric. self.metrics.l1_consolidation_duration.record(duration.as_secs_f64()); - return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)); + return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)) } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1") @@ -234,7 +234,7 @@ where self.metrics.build_new_payload_duration.record(duration.as_secs_f64()); self.metrics.gas_per_block.record(block.gas_used as f64); - return Some(EngineDriverEvent::NewPayload(block)); + return Some(EngineDriverEvent::NewPayload(block)) } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to build new payload"); diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 559e51e2..9f1c5dc7 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -189,7 +189,7 @@ where /// Returns a new event listener for the rollup node manager. pub fn event_listener(&mut self) -> EventStream { if let Some(event_sender) = &self.event_sender { - return event_sender.new_listener(); + return event_sender.new_listener() }; let event_sender = EventSender::new(EVENT_CHANNEL_SIZE); diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index e444a2c3..9ff635c4 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -321,7 +321,7 @@ async fn can_sequence_and_gossip_transactions() { if let RollupManagerEvent::BlockSequenced(block) = e { assert_eq!(block.header.number, 2); assert_eq!(block.body.transactions.len(), 1); - return true; + return true } false }, @@ -414,7 +414,7 @@ async fn can_forward_tx_to_sequencer() { if let RollupManagerEvent::BlockSequenced(block) = e { assert_eq!(block.header.number, 2); assert_eq!(block.body.transactions.len(), 1); - return true; + return true } false }, @@ -620,7 +620,7 @@ async fn graceful_shutdown_consolidates_most_recent_batch_on_startup() -> eyre:: }; if block_info.number == 4 { - break; + break }; i += 1; } @@ -819,7 +819,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { rnm_events.next().await { if consolidation_outcome.block_info().block_info.number == 4 { - break; + break } } } @@ -833,7 +833,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { rnm_events.next().await { if consolidation_outcome.block_info().block_info.number == 46 { - break; + break } } } @@ -927,7 +927,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { handle.build_block().await; if let Some(RollupManagerEvent::BlockSequenced(_)) = rnm_events.next().await { if i == 10 { - break; + break } i += 1; } @@ -940,7 +940,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::L1MessageIndexed(index)) = rnm_events.next().await { assert_eq!(index, 0); - break; + break } } l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; @@ -952,7 +952,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { if block.body.transactions.iter().any(|tx| tx.is_l1_message()) { l2_reorged_height = block.header.number; - break; + break } } } @@ -962,7 +962,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::Reorg(height)) = rnm_events.next().await { assert_eq!(height, 9); - break; + break } } @@ -971,7 +971,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { loop { if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { assert_eq!(block.number, l2_reorged_height); - break; + break } } @@ -995,7 +995,7 @@ async fn wait_n_events( n -= 1; } if n == 0 { - break; + break } } } From 0c3388e35edae8caec0991483c156a3c2ec93fe4 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 7 Aug 2025 09:28:01 +0800 Subject: [PATCH 07/10] fix after merge --- crates/node/tests/e2e.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 0d35101d..f4d66542 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -202,6 +202,7 @@ async fn can_penalize_peer_for_invalid_block() { }, signer_args: Default::default(), gas_price_oracle_args: GasPriceOracleArgs::default(), + consensus_args: ConsensusArgs::noop(), }; let (nodes, _tasks, _) = From 8ed6d71f2636bfeb9eb0f5c85cc9c6ab027726ed Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 7 Aug 2025 09:28:46 +0800 Subject: [PATCH 08/10] address review comments --- crates/engine/src/future/result.rs | 2 +- crates/node/tests/e2e.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/engine/src/future/result.rs b/crates/engine/src/future/result.rs index 8647a56d..b156ae76 100644 --- a/crates/engine/src/future/result.rs +++ b/crates/engine/src/future/result.rs @@ -1,7 +1,7 @@ use super::*; -#[derive(Debug)] /// A type that represents the result of the engine driver future. +#[derive(Debug)] pub(crate) enum EngineDriverFutureResult { BlockImport( Result< diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index f4d66542..d64b2d91 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -228,8 +228,8 @@ async fn can_penalize_peer_for_invalid_block() { node0_network_handle.announce_block(block, Signature::new(U256::from(1), U256::from(1), false)); eventually( - Duration::from_secs(100), - Duration::from_millis(10000), + Duration::from_secs(5), + Duration::from_millis(10), "Peer0 reputation should be lower after sending invalid block", || async { // check that the node0 is penalized on node1 From 050d2b889bce1af2725ec5429de4b9f5dc9a09d4 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Mon, 11 Aug 2025 06:17:41 +0800 Subject: [PATCH 09/10] Fixes after merge --- crates/node/tests/e2e.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index bb161f17..0a3d11e6 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -190,7 +190,6 @@ async fn can_penalize_peer_for_invalid_block() { sequencer_args: SequencerArgs { sequencer_enabled: true, block_time: 0, - max_l1_messages_per_block: 4, l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0), payload_building_duration: 1000, ..SequencerArgs::default() From e8d1c33e4cde94ef48ac952b5d1cc0758f746021 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Mon, 11 Aug 2025 09:17:30 +0800 Subject: [PATCH 10/10] implement test can_penalize_peer_for_invalid_signature --- crates/node/src/args.rs | 10 ++- crates/node/tests/e2e.rs | 158 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 4 deletions(-) diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 58bdf228..40151a7d 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -294,13 +294,15 @@ impl ScrollRollupNodeConfig { .then_some(network.eth_wire_block_listener().await?); // Instantiate the signer - let signer = if self.test { + let chain_id = chain_spec.chain().id(); + let signer = if let Some(configured_signer) = self.signer_args.signer(chain_id).await? { + // Use the signer configured by SignerArgs + Some(rollup_node_signer::Signer::spawn(configured_signer)) + } else if self.test { // Use a random private key signer for testing Some(rollup_node_signer::Signer::spawn(PrivateKeySigner::random())) } else { - // Use the signer configured by SignerArgs - let chain_id = chain_spec.chain().id(); - self.signer_args.signer(chain_id).await?.map(rollup_node_signer::Signer::spawn) + None }; // Spawn the rollup node manager diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 0a3d11e6..43321f05 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -239,6 +239,164 @@ async fn can_penalize_peer_for_invalid_block() { .await; } +/// Tests that peers are penalized for broadcasting blocks with invalid signatures. +/// +/// This test verifies the network's ability to detect and penalize peers that send +/// blocks with either unauthorized or malformed signatures when using the `SystemContract` +/// consensus algorithm. +/// +/// The test proceeds in three phases: +/// 1. **Valid signature verification**: Confirms that blocks signed by the authorized signer are +/// accepted and processed normally without peer penalization. +/// 2. **Unauthorized signer detection**: Sends a block signed by an unauthorized signer and +/// verifies that the sending peer's reputation is decreased. +/// 3. **Invalid signature detection**: Sends a block with a malformed signature and verifies +/// further reputation decrease or peer disconnection. +#[tokio::test] +async fn can_penalize_peer_for_invalid_signature() { + reth_tracing::init_test_tracing(); + + let chain_spec = (*SCROLL_DEV).clone(); + + // Create two signers - one authorized and one unauthorized + let authorized_signer = PrivateKeySigner::random().with_chain_id(Some(chain_spec.chain().id())); + let authorized_address = authorized_signer.address(); + let unauthorized_signer = + PrivateKeySigner::random().with_chain_id(Some(chain_spec.chain().id())); + + let mut test_config = default_sequencer_test_scroll_rollup_node_config(); + test_config.consensus_args.algorithm = ConsensusAlgorithm::SystemContract; + test_config.consensus_args.authorized_signer = Some(authorized_address); + test_config.signer_args.private_key = Some(authorized_signer.clone()); + + // Setup nodes + let (mut nodes, _tasks, _) = + setup_engine(test_config, 2, chain_spec.clone(), false, false).await.unwrap(); + + let node0 = nodes.remove(0); + let node1 = nodes.remove(0); + + // Get handles + let node0_rmn_handle = node0.inner.add_ons_handle.rollup_manager_handle.clone(); + let node0_network_handle = node0_rmn_handle.get_network_handle().await.unwrap(); + let node0_id = node0_network_handle.inner().peer_id(); + + let node1_rnm_handle = node1.inner.add_ons_handle.rollup_manager_handle.clone(); + let node1_network_handle = node1_rnm_handle.get_network_handle().await.unwrap(); + + // Get event streams + let mut node0_events = node0_rmn_handle.get_event_listener().await.unwrap(); + let mut node1_events = node1_rnm_handle.get_event_listener().await.unwrap(); + + // === Phase 1: Test valid block with correct signature === + + // Have the legitimate sequencer build and sign a block + node0_rmn_handle.build_block().await; + + // Wait for the sequencer to build the block + let block0 = if let Some(RollupManagerEvent::BlockSequenced(block)) = node0_events.next().await + { + assert_eq!(block.body.transactions.len(), 0, "Block should have no transactions"); + block + } else { + panic!("Failed to receive block from sequencer"); + }; + + // Node1 should receive and accept the valid block + if let Some(RollupManagerEvent::NewBlockReceived(block_with_peer)) = node1_events.next().await { + assert_eq!(block0.hash_slow(), block_with_peer.block.hash_slow()); + + // Verify the signature is from the authorized signer + let hash = sig_encode_hash(&block_with_peer.block); + let recovered = block_with_peer.signature.recover_address_from_prehash(&hash).unwrap(); + assert_eq!(recovered, authorized_address, "Block should be signed by authorized signer"); + } else { + panic!("Failed to receive valid block at follower"); + } + + // Wait for successful import + wait_n_events(&mut node1_events, |e| matches!(e, RollupManagerEvent::BlockImported(_)), 1) + .await; + + // === Phase 2: Create and send valid block with unauthorized signer signature === + + // Get initial reputation of node0 from node1's perspective + let initial_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + assert_eq!(initial_reputation, 0, "Initial reputation should be zero"); + + // Create a new block manually (we'll reuse the valid block structure but with wrong signature) + let mut block1 = block0.clone(); + block1.header.number += 1; + block1.header.parent_hash = block0.hash_slow(); + block1.header.timestamp += 1; + + // Sign the block with the unauthorized signer + let block_hash = sig_encode_hash(&block1); + let unauthorized_signature = unauthorized_signer.sign_hash(&block_hash).await.unwrap(); + + // Send the block with invalid signature from node0 to node1 + node0_network_handle.announce_block(block1.clone(), unauthorized_signature); + + // Node1 should receive and process the invalid block + if let Some(RollupManagerEvent::NewBlockReceived(block_with_peer)) = node1_events.next().await { + assert_eq!(block1.hash_slow(), block_with_peer.block.hash_slow()); + + // Verify the signature is from the unauthorized signer + let hash = sig_encode_hash(&block_with_peer.block); + let recovered = block_with_peer.signature.recover_address_from_prehash(&hash).unwrap(); + assert_eq!( + recovered, + unauthorized_signer.address(), + "Block should be signed by unauthorized signer" + ); + } else { + panic!("Failed to receive valid block at follower"); + } + + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Node0 reputation should be lower after sending block with invalid signature", + || async { + let current_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + current_reputation < initial_reputation + }, + ) + .await; + + // === Phase 3: Send valid block with invalid signature === + // Get current reputation of node0 from node1's perspective + let current_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + + let invalid_signature = Signature::new(U256::from(1), U256::from(1), false); + + // Create a new block with the same structure as before but with an invalid signature. + // We need to make sure the block is different so that it is not filtered. + block1.header.timestamp += 1; + node0_network_handle.announce_block(block1.clone(), invalid_signature); + + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Node0 reputation should be lower after sending block with invalid signature", + || async { + let all_peers = node1_network_handle.inner().get_all_peers().await.unwrap(); + if all_peers.is_empty() { + return true; // No peers to check, assume penalization and peer0 is blocked and + // disconnected + } + + let penalized_reputation = + node1_network_handle.inner().reputation_by_id(*node0_id).await.unwrap().unwrap(); + penalized_reputation < current_reputation + }, + ) + .await; +} + /// Helper function to wait until a predicate is true or a timeout occurs. pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) where