From e0770418b51ae23d035519a414935b745b50b22d Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 13 Aug 2025 14:21:17 +0800 Subject: [PATCH 01/12] Refactor `can_handle_reorgs_while_sequencing` test to also include a follower node and add a bunch of convenience methods --- Cargo.lock | 44 ++++ crates/engine/src/future/mod.rs | 2 +- crates/manager/src/manager/event.rs | 2 +- crates/network/src/event.rs | 2 +- crates/node/Cargo.toml | 1 + crates/node/tests/e2e.rs | 334 ++++++++++++++++++++-------- crates/signer/src/event.rs | 2 +- 7 files changed, 289 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e32b07c9..e3f9ee62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2594,6 +2594,33 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "color-eyre" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5920befb47832a6d61ee3a3a846565cfa39b331331e68a3b1d1116630f2f26d" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b88ea9df13354b55bc7234ebcce36e6ef896aca2e42a15de9e10edce01b427" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -6096,6 +6123,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dd4f4a2c8405440fd0462561f0e5806bd0f77e86f51c761481bdd4018b545e" + [[package]] name = "p256" version = "0.13.2" @@ -10321,6 +10354,7 @@ dependencies = [ "aws-config", "aws-sdk-kms", "clap", + "color-eyre", "eyre", "futures", "reqwest", @@ -12683,6 +12717,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db" +dependencies = [ + "tracing", + "tracing-subscriber 0.3.19", +] + [[package]] name = "tracing-futures" version = "0.2.5" diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 8c83b097..9d5bf709 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -42,7 +42,7 @@ type BlockImportFuture = Pin< >; /// An enum that represents the different outcomes of an L1 consolidation job. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum ConsolidationOutcome { /// Represents a successful consolidation outcome with the consolidated block info and batch /// info. diff --git a/crates/manager/src/manager/event.rs b/crates/manager/src/manager/event.rs index 9d2808b9..bd279373 100644 --- a/crates/manager/src/manager/event.rs +++ b/crates/manager/src/manager/event.rs @@ -4,7 +4,7 @@ use scroll_engine::ConsolidationOutcome; use scroll_network::NewBlockWithPeer; /// An event that can be emitted by the rollup node manager. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum RollupManagerEvent { /// A new block has been received from the network. NewBlockReceived(NewBlockWithPeer), diff --git a/crates/network/src/event.rs b/crates/network/src/event.rs index ae81484a..024388a6 100644 --- a/crates/network/src/event.rs +++ b/crates/network/src/event.rs @@ -3,7 +3,7 @@ use reth_network_api::PeerId; use reth_scroll_primitives::ScrollBlock; /// A new block with the peer id that it was received from. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct NewBlockWithPeer { pub peer_id: PeerId, pub block: ScrollBlock, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 548f147e..4402ff38 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -104,6 +104,7 @@ reth-tracing.workspace = true rollup-node = { workspace = true, features = ["test-utils"] } scroll-alloy-rpc-types-engine.workspace = true serde_json = { version = "1.0.94", default-features = false, features = ["alloc"] } +color-eyre = "0.6" [features] test-utils = [ diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 12065081..0f586cf2 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -4,10 +4,16 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; +use eyre::Ok; use futures::StreamExt; use reth_chainspec::EthChainSpec; +use reth_e2e_test_utils::{NodeHelperType, TmpDB}; use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_network_api::block::EthWireProvider; +use reth_node_api::{Block, NodeTypesWithDBAdapter}; +use reth_primitives_traits::Transaction; +use reth_provider::providers::BlockchainProvider; +use reth_revm::context::block; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; @@ -21,7 +27,7 @@ use rollup_node::{ }, BeaconProviderArgs, ConsensusAlgorithm, ConsensusArgs, DatabaseArgs, EngineDriverArgs, GasPriceOracleArgs, L1ProviderArgs, NetworkArgs as ScrollNetworkArgs, RollupNodeContext, - ScrollRollupNodeConfig, SequencerArgs, + ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, }; use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent}; use rollup_node_primitives::{sig_encode_hash, BatchCommitData, ConsensusUpdate}; @@ -239,25 +245,6 @@ async fn can_penalize_peer_for_invalid_block() { .await; } -/// Helper function to wait until a predicate is true or a timeout occurs. -pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) -where - F: FnMut() -> Fut, - Fut: std::future::Future, -{ - let mut interval = time::interval(tick); - let start = time::Instant::now(); - loop { - if predicate().await { - return; - } - - assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); - - interval.tick().await; - } -} - #[allow(clippy::large_stack_frames)] #[tokio::test] async fn can_sequence_and_gossip_transactions() { @@ -902,95 +889,105 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { #[tokio::test] async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { reth_tracing::init_test_tracing(); + color_eyre::install()?; let chain_spec = (*SCROLL_DEV).clone(); - // Launch a node - let mut config = default_test_scroll_rollup_node_config(); + // Launch 2 nodes: node0=sequencer and node1=follower. + let mut config = default_sequencer_test_scroll_rollup_node_config(); config.sequencer_args.block_time = 0; - let (mut nodes, _tasks, _) = setup_engine(config, 1, chain_spec.clone(), false, false).await?; - let node = nodes.pop().unwrap(); + let (mut nodes, _tasks, _) = setup_engine(config, 2, chain_spec.clone(), false, false).await?; + let node0 = nodes.remove(0); + let node1 = nodes.remove(0); - // Instantiate the rollup node manager. - let mut config = default_sequencer_test_scroll_rollup_node_config(); - let path = node.inner.config.datadir().db().join("scroll.db?mode=rwc"); - let path = PathBuf::from("sqlite://".to_string() + &*path.to_string_lossy()); - config.database_args.path = Some(path.clone()); - config.beacon_provider_args.url = Some( - "http://dummy:8545" - .parse() - .expect("valid url that will not be used as test batches use calldata"), - ); - config.engine_driver_args.sync_at_startup = false; - let (nodes, _tasks, _) = setup_engine(config, 1, chain_spec, false, false).await?; - let node = nodes.first().unwrap(); - let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.as_ref().unwrap(); - let mut rnm_events = - node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; - let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); + // Get handles + let node0_rnm_handle = node0.inner.add_ons_handle.rollup_manager_handle.clone(); + let mut node0_rnm_events = node0_rnm_handle.get_event_listener().await?; + let node0_l1_watcher_tx = node0.inner.add_ons_handle.l1_watcher_tx.as_ref().unwrap(); - // Send an L1 message. - let message = TxL1Message { - queue_index: 0, - gas_limit: 21000, - to: Default::default(), - value: Default::default(), - sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), - input: Default::default(), - }; + let node1_rnm_handle = node1.inner.add_ons_handle.rollup_manager_handle.clone(); + let mut node1_rnm_events = node1_rnm_handle.get_event_listener().await?; + let node1_l1_watcher_tx = node1.inner.add_ons_handle.l1_watcher_tx.as_ref().unwrap(); // Let the sequencer build 10 blocks before performing the reorg process. - let mut i = 0; - loop { - sequencer_rnm_handle.build_block().await; - if let Some(RollupManagerEvent::BlockSequenced(_)) = rnm_events.next().await { - if i == 10 { - break - } - i += 1; - } + for i in 1..=10 { + node0_rnm_handle.build_block().await; + wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; } + // Assert that the follower node has received all 10 blocks from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; + // Send a L1 message and wait for it to be indexed. - l1_watcher_tx - .send(Arc::new(L1Notification::L1Message { message, block_number: 10, block_timestamp: 0 })) - .await?; - loop { - if let Some(RollupManagerEvent::L1MessageIndexed(index)) = rnm_events.next().await { - assert_eq!(index, 0); - break - } - } - l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + let l1_message_notification = L1Notification::L1Message { + message: TxL1Message { + queue_index: 0, + gas_limit: 21000, + to: Default::default(), + value: Default::default(), + sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), + input: Default::default(), + }, + block_number: 10, + block_timestamp: 0, + }; - // Wait for block that contains the L1 message. - sequencer_rnm_handle.build_block().await; - let l2_reorged_height; - loop { - if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { - if block.body.transactions.iter().any(|tx| tx.is_l1_message()) { - l2_reorged_height = block.header.number; - break - } + // Send the L1 message to the sequencer node. + node0_l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; + wait_for_event_5s(&mut node0_rnm_events, RollupManagerEvent::L1MessageIndexed(0)).await?; + node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + + // Send L1 the L1 message to follower node. + // TODO: maybe let node reject block first? -> do in different test. + node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; + wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::L1MessageIndexed(0)).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + + // Build block that contains the L1 message. + node0_rnm_handle.build_block().await; + wait_for_event_predicate_5s(&mut node0_rnm_events, |e| { + if let RollupManagerEvent::BlockImported(block) = e { + block.body.transactions.len() == 1 && + block.body.transactions.iter().any(|tx| tx.is_l1_message()) + } else { + false } + }) + .await?; + + let l2_reorged_height = 15; + for i in 12..=l2_reorged_height { + node0_rnm_handle.build_block().await; + wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; } + // Assert that the follower node has received the latest block from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 15).await?; + // Issue and wait for the reorg. - l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; - loop { - if let Some(RollupManagerEvent::Reorg(height)) = rnm_events.next().await { - assert_eq!(height, 9); - break - } - } + node0_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; + wait_for_event_5s(&mut node0_rnm_events, RollupManagerEvent::Reorg(9)).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; + wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::Reorg(9)).await?; + + // TODO: verify latest block and that reorg on L2 happened correctly. + let latestBlock = node0 + .rpc + .inner + .eth_api() + .block_by_number(BlockNumberOrTag::Latest, false) + .await? + .expect("head block must exist"); - // Get the next sequenced L2 block. - sequencer_rnm_handle.build_block().await; - loop { - if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { - assert_eq!(block.number, l2_reorged_height); - break - } - } + println!("Latest block on node0: {:?}", latestBlock); + // return Ok(()); + + // Since the L1 reorg reverted the L1 message included in block 10, the sequencer + // should produce a new block at height 10. + node0_rnm_handle.build_block().await; + wait_for_block_sequenced_5s(&mut node0_rnm_events, 10).await?; + + // Assert that the follower node has received the new block from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; Ok(()) } @@ -1136,12 +1133,142 @@ pub fn read_to_bytes>(path: P) -> eyre::Result Ok(Bytes::from_str(&std::fs::read_to_string(path)?)?) } +// async fn latest_block( +// node: NodeHelperType< +// ScrollRollupNode, +// BlockchainProvider>, +// >, +// ) -> eyre::Result> { +// node.rpc +// .inner +// .eth_api() +// .block_by_number(BlockNumberOrTag::Latest, false) +// .await? +// .ok_or_else(|| eyre::eyre!("Latest block not found")) +// } + +async fn wait_for_block_sequenced( + events: &mut EventStream, + block_number: u64, + timeout: Duration, +) -> eyre::Result { + let mut block = None; + + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::BlockSequenced(b) = e { + if b.header.number == block_number { + block = Some(b.clone()); + return true; + } + } + + return false; + }, + timeout, + ) + .await?; + + block.ok_or(eyre::eyre!("Block with number {block_number} was not sequenced")) +} + +async fn wait_for_block_sequenced_5s( + events: &mut EventStream, + block_number: u64, +) -> eyre::Result { + wait_for_block_sequenced(events, block_number, Duration::from_secs(5)).await +} + +async fn wait_for_block_imported( + events: &mut EventStream, + block_number: u64, + timeout: Duration, +) -> eyre::Result { + let mut block = None; + + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::BlockImported(b) = e { + if b.header.number == block_number { + block = Some(b.clone()); + return true; + } + } + + return false; + }, + timeout, + ) + .await?; + + block.ok_or(eyre::eyre!("Block with number {block_number} was not imported")) +} + +async fn wait_for_block_imported_5s( + events: &mut EventStream, + block_number: u64, +) -> eyre::Result { + wait_for_block_imported(events, block_number, Duration::from_secs(5)).await +} + +async fn wait_for_event_predicate( + event_stream: &mut EventStream, + mut predicate: impl FnMut(RollupManagerEvent) -> bool, + timeout: Duration, +) -> eyre::Result<()> { + let sleep = tokio::time::sleep(timeout); + tokio::pin!(sleep); + + loop { + tokio::select! { + maybe_event = event_stream.next() => { + match maybe_event { + Some(e) if predicate(e.clone()) => { + return Ok(()); + } + Some(e) => { + println!("Ignoring event: {:?}", e); + continue + }, // Ignore other events + None => return Err(eyre::eyre!("Event stream ended unexpectedly")), + } + } + _ = &mut sleep => return Err(eyre::eyre!("Timeout while waiting for event")), + } + } +} + +async fn wait_for_event_predicate_5s( + event_stream: &mut EventStream, + predicate: impl FnMut(RollupManagerEvent) -> bool, +) -> eyre::Result<()> { + wait_for_event_predicate(event_stream, predicate, Duration::from_secs(5)).await +} + +async fn wait_for_event( + event_stream: &mut EventStream, + event: RollupManagerEvent, + timeout: Duration, +) -> eyre::Result<()> { + wait_for_event_predicate(event_stream, |e| e == event, timeout).await +} + +async fn wait_for_event_5s( + event_stream: &mut EventStream, + event: RollupManagerEvent, +) -> eyre::Result<()> { + wait_for_event(event_stream, event, Duration::from_secs(5)).await +} + /// Waits for n events to be emitted. async fn wait_n_events( events: &mut EventStream, - matches: impl Fn(RollupManagerEvent) -> bool, + mut matches: impl FnMut(RollupManagerEvent) -> bool, mut n: u64, ) { + // TODO: refactor using `wait_for_event_predicate` while let Some(event) = events.next().await { if matches(event) { n -= 1; @@ -1151,3 +1278,22 @@ async fn wait_n_events( } } } + +/// Helper function to wait until a predicate is true or a timeout occurs. +pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let mut interval = time::interval(tick); + let start = time::Instant::now(); + loop { + if predicate().await { + return; + } + + assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); + + interval.tick().await; + } +} diff --git a/crates/signer/src/event.rs b/crates/signer/src/event.rs index 5d28886d..de421e80 100644 --- a/crates/signer/src/event.rs +++ b/crates/signer/src/event.rs @@ -2,7 +2,7 @@ use alloy_signer::Signature; use reth_scroll_primitives::ScrollBlock; /// An enum representing the events that can be emitted by the signer. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum SignerEvent { /// A block has been signed by the signer. SignedBlock { From cc2e7c45b43d7512a02c32abd106d65fe311e24f Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Wed, 13 Aug 2025 13:22:42 +0200 Subject: [PATCH 02/12] feat: attach L1 block number to ScrollPayloadAttributes --- crates/derivation-pipeline/src/lib.rs | 16 +++++++----- crates/engine/src/error.rs | 4 +-- crates/engine/src/future/mod.rs | 36 ++++++++++++++++----------- crates/signer/src/event.rs | 2 +- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index dc31ab9c..4ab42776 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -166,14 +166,14 @@ impl

Stream for DerivationPipeline

where P: L1Provider + Clone + Unpin + Send + Sync + 'static, { - type Item = ScrollPayloadAttributesWithBatchInfo; + type Item = WithBlockNumber; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); // return attributes from the queue if any. if let Some(attribute) = this.attributes_queue.pop_front() { - return Poll::Ready(Some(attribute.inner)) + return Poll::Ready(Some(attribute)) } // if future is None and the batch queue is empty, store the waker and return. @@ -438,8 +438,10 @@ mod tests { // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); - while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) = - pipeline.next().await + while let Some(WithBlockNumber { + inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }, + .. + }) = pipeline.next().await { if a.payload_attributes.timestamp == 1696935657 { attribute = a; @@ -496,8 +498,10 @@ mod tests { // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); - while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) = - pipeline.next().await + while let Some(WithBlockNumber { + inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }, + .. + }) = pipeline.next().await { if a.payload_attributes.timestamp == 1696935657 { attribute = a; diff --git a/crates/engine/src/error.rs b/crates/engine/src/error.rs index 428cb68f..3543780d 100644 --- a/crates/engine/src/error.rs +++ b/crates/engine/src/error.rs @@ -1,5 +1,5 @@ use alloy_rpc_types_engine::PayloadError; -use rollup_node_primitives::ScrollPayloadAttributesWithBatchInfo; +use rollup_node_primitives::{ScrollPayloadAttributesWithBatchInfo, WithBlockNumber}; use scroll_alloy_provider::ScrollEngineApiError; /// The error type for the engine API. @@ -19,5 +19,5 @@ pub enum EngineDriverError { ForkchoiceUpdateFailed(ScrollEngineApiError), /// The payload id field is missing in the forkchoice update response. #[error("Forkchoice update response missing payload id")] - MissingPayloadId(ScrollPayloadAttributesWithBatchInfo), + MissingPayloadId(WithBlockNumber), } diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 9d5bf709..1aef8c74 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -10,7 +10,7 @@ use reth_scroll_engine_primitives::try_into_block; use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{ BatchInfo, BlockInfo, L2BlockInfoWithL1Messages, MeteredFuture, - ScrollPayloadAttributesWithBatchInfo, + ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, }; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; @@ -92,8 +92,8 @@ pub(crate) type BuildNewPayloadFuture = /// An enum that represents the different types of futures that can be executed on the engine API. /// It can be a block import job, an L1 consolidation job, or a new payload processing. pub(crate) enum EngineFuture { - BlockImport(BlockImportFuture), - L1Consolidation(L1ConsolidationFuture), + BlockImport(WithBlockNumber), + L1Consolidation(WithBlockNumber), NewPayload(NewPayloadFuture), } @@ -107,7 +107,10 @@ impl EngineFuture { where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, { - Self::BlockImport(Box::pin(handle_execution_payload(client, block_with_peer, fcs))) + Self::BlockImport(WithBlockNumber::new( + block_with_peer.block.header.number, + Box::pin(handle_execution_payload(client, block_with_peer, fcs)), + )) } /// Creates a new [`EngineFuture::L1Consolidation`] future from the provided parameters. @@ -115,18 +118,21 @@ impl EngineFuture { client: Arc, execution_payload_provider: P, fcs: ForkchoiceState, - payload_attributes: ScrollPayloadAttributesWithBatchInfo, + payload_attributes: WithBlockNumber, ) -> Self where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, P: Provider + Unpin + Send + Sync + 'static, { - Self::L1Consolidation(Box::pin(handle_payload_attributes( - client, - execution_payload_provider, - fcs, - payload_attributes, - ))) + Self::L1Consolidation(WithBlockNumber::new( + payload_attributes.number, + Box::pin(handle_payload_attributes( + client, + execution_payload_provider, + fcs, + payload_attributes, + )), + )) } /// Creates a new [`EngineFuture::NewPayload`] future from the provided parameters. @@ -150,8 +156,8 @@ impl Future for EngineFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { - Self::BlockImport(fut) => fut.as_mut().poll(cx).map(Into::into), - Self::L1Consolidation(fut) => fut.as_mut().poll(cx).map(Into::into), + Self::BlockImport(fut) => fut.inner.as_mut().poll(cx).map(Into::into), + Self::L1Consolidation(fut) => fut.inner.as_mut().poll(cx).map(Into::into), Self::NewPayload(fut) => fut.as_mut().poll(cx).map(Into::into), } } @@ -239,7 +245,7 @@ async fn handle_payload_attributes( client: Arc, provider: P, fcs: ForkchoiceState, - payload_attributes_with_batch_info: ScrollPayloadAttributesWithBatchInfo, + payload_attributes_with_batch_info: WithBlockNumber, ) -> Result where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, @@ -248,7 +254,7 @@ where tracing::trace!(target: "scroll::engine::future", ?fcs, ?payload_attributes_with_batch_info, "handling payload attributes"); let ScrollPayloadAttributesWithBatchInfo { mut payload_attributes, batch_info } = - payload_attributes_with_batch_info.clone(); + payload_attributes_with_batch_info.inner.clone(); let maybe_execution_payload = provider .get_block((fcs.safe_block_info().number + 1).into()) diff --git a/crates/signer/src/event.rs b/crates/signer/src/event.rs index de421e80..83324429 100644 --- a/crates/signer/src/event.rs +++ b/crates/signer/src/event.rs @@ -2,7 +2,7 @@ use alloy_signer::Signature; use reth_scroll_primitives::ScrollBlock; /// An enum representing the events that can be emitted by the signer. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum SignerEvent { /// A block has been signed by the signer. SignedBlock { From 26c09d142a68f6cc995810b5c96029e5332fdebd Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Wed, 13 Aug 2025 13:23:51 +0200 Subject: [PATCH 03/12] feat: correctly handle L1 reorgs in driver --- crates/engine/src/driver.rs | 68 ++++++++++++++++++++++++++++--- crates/manager/src/manager/mod.rs | 15 +++---- crates/node/tests/e2e.rs | 9 ++-- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 73d97833..b7efd7b4 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -7,7 +7,9 @@ use crate::{ use alloy_provider::Provider; use futures::{ready, task::AtomicWaker, FutureExt, Stream}; -use rollup_node_primitives::{BlockInfo, MeteredFuture, ScrollPayloadAttributesWithBatchInfo}; +use rollup_node_primitives::{ + BlockInfo, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, +}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::ScrollEngineApi; @@ -39,7 +41,7 @@ pub struct EngineDriver { /// Block building duration. block_building_duration: Duration, /// The pending payload attributes derived from batches on L1. - l1_payload_attributes: VecDeque, + l1_payload_attributes: VecDeque>, /// The pending block imports received over the network. block_imports: VecDeque, /// The payload attributes associated with the next block to be built. @@ -121,6 +123,57 @@ where } } + /// Handle L1 reorg, with the L1 block number reorged to, and whether this reorged the head or + /// batches. + pub fn handle_l1_reorg( + &mut self, + l1_block_number: u64, + reorged_unsafe_head: Option, + reorged_safe_head: Option, + ) { + // On an unsafe head reorg: clear the payload building future, reset the unsafe head and + // drop the engine future if it's a `NewPayload` or `BlockImport` with block number > L2 + // reorged number. + if let Some(l2_head_block_info) = reorged_unsafe_head { + self.payload_building_future = None; + self.set_head_block_info(l2_head_block_info); + if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() { + match fut { + EngineFuture::BlockImport(WithBlockNumber { number, .. }) + if number > &l2_head_block_info.number => + { + self.engine_future = None + } + // `NewPayload` future is ONLY instantiated when the payload building future is + // done, and we want to issue the payload to the EN. Thus, we also clear it on a + // L2 reorg. + EngineFuture::NewPayload(_) => self.engine_future = None, + _ => {} + } + } + } + + // On a safe head reorg: reset the safe head. + if let Some(safe_block_info) = reorged_safe_head { + self.set_safe_block_info(safe_block_info); + } + + // drop the engine future if it's a `L1Consolidation` future associated with a L1 block + // number > l1_block_number. + if matches!( + self.engine_future.as_ref(), + Some(MeteredFuture { + fut: EngineFuture::L1Consolidation(WithBlockNumber { number, .. }), + .. + }) if number > &l1_block_number + ) { + self.engine_future = None; + } + + // retain the L1 payload attributes with block number <= L1 block. + self.l1_payload_attributes.retain(|attribute| attribute.number <= l1_block_number); + } + /// Handles a block import request by adding it to the queue and waking up the driver. pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) { tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received"); @@ -138,7 +191,10 @@ where /// Handles a [`ScrollPayloadAttributes`] sourced from L1 by initiating a task sending the /// attribute to the EN via the [`EngineDriver`]. - pub fn handle_l1_consolidation(&mut self, attributes: ScrollPayloadAttributesWithBatchInfo) { + pub fn handle_l1_consolidation( + &mut self, + attributes: WithBlockNumber, + ) { self.l1_payload_attributes.push_back(attributes); self.waker.wake(); } @@ -186,7 +242,7 @@ where self.metrics.block_import_duration.record(duration.as_secs_f64()); // Return the block import outcome - return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome) + return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to import block"); @@ -213,7 +269,7 @@ where // record the metric. self.metrics.l1_consolidation_duration.record(duration.as_secs_f64()); - return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)) + return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1") @@ -234,7 +290,7 @@ where self.metrics.build_new_payload_duration.record(duration.as_secs_f64()); self.metrics.gas_per_block.record(block.gas_used as f64); - return Some(EngineDriverEvent::NewPayload(block)) + return Some(EngineDriverEvent::NewPayload(block)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to build new payload"); diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 4557ac7a..787304bc 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -268,15 +268,12 @@ where l2_head_block_info, l2_safe_block_info, } => { - // Update the [`EngineDriver`] fork choice state with the new L2 head info. - if let Some(l2_head_block_info) = l2_head_block_info { - self.engine.set_head_block_info(l2_head_block_info); - } - - // Update the [`EngineDriver`] fork choice state with the new L2 safe info. - if let Some(safe_block_info) = l2_safe_block_info { - self.engine.set_safe_block_info(safe_block_info); - } + // Handle the reorg in the engine driver. + self.engine.handle_l1_reorg( + l1_block_number, + l2_head_block_info, + l2_safe_block_info, + ); // Update the [`Sequencer`] with the new L1 head info and queue index. if let Some(sequencer) = self.sequencer.as_mut() { diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 0f586cf2..6ee45385 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -946,7 +946,8 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { node0_rnm_handle.build_block().await; wait_for_event_predicate_5s(&mut node0_rnm_events, |e| { if let RollupManagerEvent::BlockImported(block) = e { - block.body.transactions.len() == 1 && + block.header.number == 11 && + block.body.transactions.len() == 1 && block.body.transactions.iter().any(|tx| tx.is_l1_message()) } else { false @@ -981,13 +982,13 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { println!("Latest block on node0: {:?}", latestBlock); // return Ok(()); - // Since the L1 reorg reverted the L1 message included in block 10, the sequencer + // Since the L1 reorg reverted the L1 message included in block 11, the sequencer // should produce a new block at height 10. node0_rnm_handle.build_block().await; - wait_for_block_sequenced_5s(&mut node0_rnm_events, 10).await?; + wait_for_block_sequenced_5s(&mut node0_rnm_events, 11).await?; // Assert that the follower node has received the new block from the sequencer node. - wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; + wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; Ok(()) } From aa49f4d22b2b7834e0fa193b4d436fc203f78d6d Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 14 Aug 2025 13:02:06 +0800 Subject: [PATCH 04/12] adjust can_handle_l1_message_reorg test to assert correct reorg conditions --- Cargo.lock | 2 ++ crates/node/Cargo.toml | 2 ++ crates/node/tests/e2e.rs | 65 +++++++++++++++++++++------------------- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3f9ee62..ddccc025 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10346,6 +10346,7 @@ dependencies = [ "alloy-provider", "alloy-rpc-client", "alloy-rpc-types-engine 1.0.23", + "alloy-rpc-types-eth", "alloy-signer", "alloy-signer-aws", "alloy-signer-local", @@ -10401,6 +10402,7 @@ dependencies = [ "scroll-alloy-hardforks", "scroll-alloy-network", "scroll-alloy-provider", + "scroll-alloy-rpc-types", "scroll-alloy-rpc-types-engine", "scroll-db", "scroll-derivation-pipeline", diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 4402ff38..b1a82c47 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -75,6 +75,7 @@ reth-provider = { workspace = true, optional = true } reth-rpc-server-types = { workspace = true, optional = true } scroll-alloy-rpc-types-engine = { workspace = true, optional = true } scroll-derivation-pipeline = { workspace = true, optional = true } +scroll-alloy-rpc-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false } scroll-db.workspace = true scroll-engine.workspace = true @@ -105,6 +106,7 @@ rollup-node = { workspace = true, features = ["test-utils"] } scroll-alloy-rpc-types-engine.workspace = true serde_json = { version = "1.0.94", default-features = false, features = ["alloc"] } color-eyre = "0.6" +alloy-rpc-types-eth = { workspace = true } [features] test-utils = [ diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 6ee45385..316a47d3 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -2,6 +2,7 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; +use alloy_rpc_types_eth::Block; use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; use eyre::Ok; @@ -10,7 +11,7 @@ use reth_chainspec::EthChainSpec; use reth_e2e_test_utils::{NodeHelperType, TmpDB}; use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_network_api::block::EthWireProvider; -use reth_node_api::{Block, NodeTypesWithDBAdapter}; +use reth_node_api::NodeTypesWithDBAdapter; use reth_primitives_traits::Transaction; use reth_provider::providers::BlockchainProvider; use reth_revm::context::block; @@ -35,6 +36,7 @@ use rollup_node_providers::BlobSource; use rollup_node_sequencer::L1MessageInclusionMode; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; +use scroll_alloy_rpc_types::Transaction as ScrollAlloyTransaction; use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; use std::{path::PathBuf, sync::Arc, time::Duration}; @@ -887,7 +889,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { #[allow(clippy::large_stack_frames)] #[tokio::test] -async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { +async fn can_handle_l1_message_reorg() -> eyre::Result<()> { reth_tracing::init_test_tracing(); color_eyre::install()?; let chain_spec = (*SCROLL_DEV).clone(); @@ -937,7 +939,6 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; // Send L1 the L1 message to follower node. - // TODO: maybe let node reject block first? -> do in different test. node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::L1MessageIndexed(0)).await?; node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; @@ -955,8 +956,7 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { }) .await?; - let l2_reorged_height = 15; - for i in 12..=l2_reorged_height { + for i in 12..=15 { node0_rnm_handle.build_block().await; wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; } @@ -964,32 +964,33 @@ async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { // Assert that the follower node has received the latest block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 15).await?; + assert_eq!(latest_block(&node0).await?.header.number, 15); + assert_eq!(latest_block(&node1).await?.header.number, 15); + // Issue and wait for the reorg. node0_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; wait_for_event_5s(&mut node0_rnm_events, RollupManagerEvent::Reorg(9)).await?; node1_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::Reorg(9)).await?; - // TODO: verify latest block and that reorg on L2 happened correctly. - let latestBlock = node0 - .rpc - .inner - .eth_api() - .block_by_number(BlockNumberOrTag::Latest, false) - .await? - .expect("head block must exist"); - - println!("Latest block on node0: {:?}", latestBlock); - // return Ok(()); + // TODO: this can only become true if we do https://github.com/scroll-tech/rollup-node/issues/254 + // assert_eq!(latest_block(&node0).await?.header.number, 10); + // assert_eq!(latest_block(&node1).await?.header.number, 10); // Since the L1 reorg reverted the L1 message included in block 11, the sequencer - // should produce a new block at height 10. + // should produce a new block at height 11. node0_rnm_handle.build_block().await; wait_for_block_sequenced_5s(&mut node0_rnm_events, 11).await?; // Assert that the follower node has received the new block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; + // TODO: why is this not known in the engine without the sleep? + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + assert_eq!(latest_block(&node0).await?.header.number, 11); + assert_eq!(latest_block(&node1).await?.header.number, 11); + Ok(()) } @@ -1134,19 +1135,21 @@ pub fn read_to_bytes>(path: P) -> eyre::Result Ok(Bytes::from_str(&std::fs::read_to_string(path)?)?) } -// async fn latest_block( -// node: NodeHelperType< -// ScrollRollupNode, -// BlockchainProvider>, -// >, -// ) -> eyre::Result> { -// node.rpc -// .inner -// .eth_api() -// .block_by_number(BlockNumberOrTag::Latest, false) -// .await? -// .ok_or_else(|| eyre::eyre!("Latest block not found")) -// } +async fn latest_block( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, +) -> eyre::Result> { + eyre::Ok( + node.rpc + .inner + .eth_api() + .block_by_number(BlockNumberOrTag::Latest, false) + .await? + .expect("latest block must exist"), + ) +} async fn wait_for_block_sequenced( events: &mut EventStream, @@ -1230,7 +1233,7 @@ async fn wait_for_event_predicate( return Ok(()); } Some(e) => { - println!("Ignoring event: {:?}", e); + tracing::debug!(target: "TODO:nodeX", "ignoring event {:?}", e); continue }, // Ignore other events None => return Err(eyre::eyre!("Event stream ended unexpectedly")), From 4be2b6a90d72d6a133659eab2a844819f1ab99df Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 14 Aug 2025 13:15:04 +0800 Subject: [PATCH 05/12] fix linter errors --- crates/engine/src/future/mod.rs | 2 +- crates/node/tests/e2e.rs | 29 ++++++++++++----------------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 1aef8c74..6afa6381 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -42,7 +42,7 @@ type BlockImportFuture = Pin< >; /// An enum that represents the different outcomes of an L1 consolidation job. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum ConsolidationOutcome { /// Represents a successful consolidation outcome with the consolidated block info and batch /// info. diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 316a47d3..4cb33e68 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -12,9 +12,7 @@ use reth_e2e_test_utils::{NodeHelperType, TmpDB}; use reth_network::{NetworkConfigBuilder, Peers, PeersInfo}; use reth_network_api::block::EthWireProvider; use reth_node_api::NodeTypesWithDBAdapter; -use reth_primitives_traits::Transaction; use reth_provider::providers::BlockchainProvider; -use reth_revm::context::block; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; @@ -1141,14 +1139,12 @@ async fn latest_block( BlockchainProvider>, >, ) -> eyre::Result> { - eyre::Ok( - node.rpc - .inner - .eth_api() - .block_by_number(BlockNumberOrTag::Latest, false) - .await? - .expect("latest block must exist"), - ) + node.rpc + .inner + .eth_api() + .block_by_number(BlockNumberOrTag::Latest, false) + .await? + .ok_or_else(|| eyre::eyre!("Latest block not found")) } async fn wait_for_block_sequenced( @@ -1163,18 +1159,18 @@ async fn wait_for_block_sequenced( |e| { if let RollupManagerEvent::BlockSequenced(b) = e { if b.header.number == block_number { - block = Some(b.clone()); + block = Some(b); return true; } } - return false; + false }, timeout, ) .await?; - block.ok_or(eyre::eyre!("Block with number {block_number} was not sequenced")) + block.ok_or_else(|| eyre::eyre!("Block with number {block_number} was not sequenced")) } async fn wait_for_block_sequenced_5s( @@ -1196,18 +1192,18 @@ async fn wait_for_block_imported( |e| { if let RollupManagerEvent::BlockImported(b) = e { if b.header.number == block_number { - block = Some(b.clone()); + block = Some(b); return true; } } - return false; + false }, timeout, ) .await?; - block.ok_or(eyre::eyre!("Block with number {block_number} was not imported")) + block.ok_or_else(|| eyre::eyre!("Block with number {block_number} was not imported")) } async fn wait_for_block_imported_5s( @@ -1234,7 +1230,6 @@ async fn wait_for_event_predicate( } Some(e) => { tracing::debug!(target: "TODO:nodeX", "ignoring event {:?}", e); - continue }, // Ignore other events None => return Err(eyre::eyre!("Event stream ended unexpectedly")), } From ca1a11997f7287d5d6bdef4ed999803dcb5d1ce3 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 14 Aug 2025 14:53:41 +0800 Subject: [PATCH 06/12] make sequencer not issue blocks by default in tests --- crates/node/src/test_utils.rs | 9 ++++++++- crates/node/tests/e2e.rs | 19 ++++++++----------- crates/node/tests/sync.rs | 3 ++- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/node/src/test_utils.rs b/crates/node/src/test_utils.rs index 6cd397f6..359e58cc 100644 --- a/crates/node/src/test_utils.rs +++ b/crates/node/src/test_utils.rs @@ -157,6 +157,13 @@ pub fn default_test_scroll_rollup_node_config() -> ScrollRollupNodeConfig { } /// Returns a default [`ScrollRollupNodeConfig`] preconfigured for testing with sequencer. +/// It sets `sequencer_args.block_time = 0` so that no blocks are produced automatically. +/// To produce blocks the `build_block` method needs to be invoked. +/// This is so that block production and test scenarios remain predictable. +/// +/// In case this behavior is not wanted, `block_time` can be adjusted to any value > 0 after +/// obtaining the config so that the sequencer node will produce blocks automatically in this +/// interval. pub fn default_sequencer_test_scroll_rollup_node_config() -> ScrollRollupNodeConfig { ScrollRollupNodeConfig { test: true, @@ -166,7 +173,7 @@ pub fn default_sequencer_test_scroll_rollup_node_config() -> ScrollRollupNodeCon engine_driver_args: EngineDriverArgs { en_sync_trigger: 100, sync_at_startup: true }, sequencer_args: SequencerArgs { sequencer_enabled: true, - block_time: 50, + block_time: 0, payload_building_duration: 40, fee_recipient: Default::default(), l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0), diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 4cb33e68..e30caa2a 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -893,8 +893,7 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { let chain_spec = (*SCROLL_DEV).clone(); // Launch 2 nodes: node0=sequencer and node1=follower. - let mut config = default_sequencer_test_scroll_rollup_node_config(); - config.sequencer_args.block_time = 0; + let config = default_sequencer_test_scroll_rollup_node_config(); let (mut nodes, _tasks, _) = setup_engine(config, 2, chain_spec.clone(), false, false).await?; let node0 = nodes.remove(0); let node1 = nodes.remove(0); @@ -999,16 +998,12 @@ async fn can_gossip_over_eth_wire() -> eyre::Result<()> { // Create the chain spec for scroll dev with Feynman activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); + let mut config = default_sequencer_test_scroll_rollup_node_config(); + config.sequencer_args.block_time = 40; + // Setup the rollup node manager. - let (mut nodes, _tasks, _) = setup_engine( - default_sequencer_test_scroll_rollup_node_config(), - 2, - chain_spec.clone(), - false, - false, - ) - .await - .unwrap(); + let (mut nodes, _tasks, _) = + setup_engine(config, 2, chain_spec.clone(), false, false).await.unwrap(); let _sequencer = nodes.pop().unwrap(); let follower = nodes.pop().unwrap(); @@ -1043,12 +1038,14 @@ async fn signer_rotation() -> eyre::Result<()> { sequencer_1_config.consensus_args.algorithm = ConsensusAlgorithm::SystemContract; sequencer_1_config.consensus_args.authorized_signer = Some(signer_1_address); sequencer_1_config.signer_args.private_key = Some(signer_1); + sequencer_1_config.sequencer_args.block_time = 40; let mut sequencer_2_config = default_sequencer_test_scroll_rollup_node_config(); sequencer_2_config.test = false; sequencer_2_config.consensus_args.algorithm = ConsensusAlgorithm::SystemContract; sequencer_2_config.consensus_args.authorized_signer = Some(signer_1_address); sequencer_2_config.signer_args.private_key = Some(signer_2); + sequencer_2_config.sequencer_args.block_time = 40; // Setup two sequencer nodes. let (mut nodes, _tasks, _) = diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 48b4a281..c323b0d7 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -89,7 +89,8 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { async fn test_should_trigger_pipeline_sync_for_execution_node() { reth_tracing::init_test_tracing(); let node_config = default_test_scroll_rollup_node_config(); - let sequencer_node_config = default_sequencer_test_scroll_rollup_node_config(); + let mut sequencer_node_config = default_sequencer_test_scroll_rollup_node_config(); + sequencer_node_config.sequencer_args.block_time = 40; // Create the chain spec for scroll mainnet with Feynman activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); From 7267807c1a9b257952ecdaaf1edf5a7e112a2707 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 14 Aug 2025 17:30:03 +0800 Subject: [PATCH 07/12] improve test --- crates/node/tests/e2e.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index e30caa2a..304957be 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -977,14 +977,11 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Since the L1 reorg reverted the L1 message included in block 11, the sequencer // should produce a new block at height 11. node0_rnm_handle.build_block().await; - wait_for_block_sequenced_5s(&mut node0_rnm_events, 11).await?; + wait_for_block_imported_5s(&mut node0_rnm_events, 11).await?; // Assert that the follower node has received the new block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; - // TODO: why is this not known in the engine without the sleep? - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - assert_eq!(latest_block(&node0).await?.header.number, 11); assert_eq!(latest_block(&node1).await?.header.number, 11); From fd2574a9856911abbebad1e6c635f548fe5e545c Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 27 Aug 2025 10:05:56 +0800 Subject: [PATCH 08/12] fixes after merge --- crates/database/db/src/operations.rs | 2 +- crates/engine/src/driver.rs | 4 +-- crates/engine/src/future/mod.rs | 8 +++-- crates/manager/src/manager/event.rs | 2 +- crates/network/src/event.rs | 2 +- crates/node/tests/e2e.rs | 54 +++++++++++++++++++++------- 6 files changed, 52 insertions(+), 20 deletions(-) diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index f95619bb..1096e614 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -579,7 +579,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { /// /// It can either be an index, which is the queue index of the first message to return, or a hash, /// which is the hash of the first message to return. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum L1MessageStart { /// Start from the provided queue index. Index(u64), diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 3db36562..eb24d660 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -8,9 +8,7 @@ use crate::{ use alloy_provider::Provider; use futures::{ready, task::AtomicWaker, FutureExt, Stream}; use rollup_node_primitives::{ - BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, -, }; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; @@ -139,7 +137,7 @@ where self.set_head_block_info(l2_head_block_info); if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() { match fut { - EngineFuture::BlockImport(WithBlockNumber { number, .. }) + EngineFuture::ChainImport(WithBlockNumber { number, .. }) if number > &l2_head_block_info.number => { self.engine_future = None diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 62d2a639..987a1041 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -97,7 +97,7 @@ pub(crate) type OptimisticSyncFuture = /// An enum that represents the different types of futures that can be executed on the engine API. /// It can be a block import job, an L1 consolidation job, or a new payload processing. pub(crate) enum EngineFuture { - ChainImport(ChainImportFuture), + ChainImport(WithBlockNumber), L1Consolidation(WithBlockNumber), NewPayload(NewPayloadFuture), OptimisticSync(OptimisticSyncFuture), @@ -112,7 +112,11 @@ impl EngineFuture { where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, { - Self::ChainImport(Box::pin(handle_chain_import(client, chain_import, fcs))) + let highest_block_number = chain_import.chain.last().unwrap().number; + Self::ChainImport(WithBlockNumber::new( + highest_block_number, + Box::pin(handle_chain_import(client, chain_import, fcs)), + )) } pub(crate) fn optimistic_sync(client: Arc, fcs: AlloyForkchoiceState) -> Self diff --git a/crates/manager/src/manager/event.rs b/crates/manager/src/manager/event.rs index fa588492..dfa970d8 100644 --- a/crates/manager/src/manager/event.rs +++ b/crates/manager/src/manager/event.rs @@ -8,7 +8,7 @@ use scroll_engine::ConsolidationOutcome; use scroll_network::NewBlockWithPeer; /// An event that can be emitted by the rollup node manager. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum RollupManagerEvent { /// A new block has been received from the network. NewBlockReceived(NewBlockWithPeer), diff --git a/crates/network/src/event.rs b/crates/network/src/event.rs index 024388a6..20695b66 100644 --- a/crates/network/src/event.rs +++ b/crates/network/src/event.rs @@ -3,7 +3,7 @@ use reth_network_api::PeerId; use reth_scroll_primitives::ScrollBlock; /// A new block with the peer id that it was received from. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct NewBlockWithPeer { pub peer_id: PeerId, pub block: ScrollBlock, diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 6d37513d..4e41c0ba 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1000,24 +1000,37 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Send the L1 message to the sequencer node. node0_l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; - wait_for_event_5s(&mut node0_rnm_events, RollupManagerEvent::L1MessageIndexed(0)).await?; node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + wait_for_event_5s( + &mut node0_rnm_events, + RollupManagerEvent::ChainOrchestratorEvent(ChainOrchestratorEvent::L1MessageCommitted(0)), + ) + .await?; // Send L1 the L1 message to follower node. node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; - wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::L1MessageIndexed(0)).await?; node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + wait_for_event_5s( + &mut node1_rnm_events, + RollupManagerEvent::ChainOrchestratorEvent(ChainOrchestratorEvent::L1MessageCommitted(0)), + ) + .await?; // Build block that contains the L1 message. + let mut block11_before_reorg = None; node0_rnm_handle.build_block().await; wait_for_event_predicate_5s(&mut node0_rnm_events, |e| { - if let RollupManagerEvent::BlockImported(block) = e { - block.header.number == 11 && + if let RollupManagerEvent::BlockSequenced(block) = e { + if block.header.number == 11 && block.body.transactions.len() == 1 && block.body.transactions.iter().any(|tx| tx.is_l1_message()) - } else { - false + { + block11_before_reorg = Some(block.header.hash_slow()); + return true; + } } + + false }) .await?; @@ -1029,8 +1042,13 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Assert that the follower node has received the latest block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 15).await?; - assert_eq!(latest_block(&node0).await?.header.number, 15); - assert_eq!(latest_block(&node1).await?.header.number, 15); + // Assert both nodes are at block 15. + let node0_latest_block = latest_block(&node0).await?; + assert_eq!(node0_latest_block.header.number, 15); + assert_eq!( + node0_latest_block.header.hash_slow(), + latest_block(&node1).await?.header.hash_slow() + ); // Issue and wait for the reorg. node0_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; @@ -1045,13 +1063,26 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Since the L1 reorg reverted the L1 message included in block 11, the sequencer // should produce a new block at height 11. node0_rnm_handle.build_block().await; - wait_for_block_imported_5s(&mut node0_rnm_events, 11).await?; + wait_for_block_sequenced_5s(&mut node0_rnm_events, 11).await?; // Assert that the follower node has received the new block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; - assert_eq!(latest_block(&node0).await?.header.number, 11); - assert_eq!(latest_block(&node1).await?.header.number, 11); + // TODO: use eventually instead of sleep + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // TODO: wait for L2ChainCommitted? + + // Assert both nodes are at block 11. + let node0_latest_block = latest_block(&node0).await?; + assert_eq!(node0_latest_block.header.number, 11); + assert_eq!( + node0_latest_block.header.hash_slow(), + latest_block(&node1).await?.header.hash_slow() + ); + + // Assert that block 11 has a different hash after the reorg. + assert_ne!(block11_before_reorg.unwrap(), node0_latest_block.header.hash_slow()); Ok(()) } @@ -1067,7 +1098,6 @@ async fn can_gossip_over_eth_wire() -> eyre::Result<()> { config.sequencer_args.block_time = 40; // Setup the rollup node manager. - let config = default_sequencer_test_scroll_rollup_node_config(); let (mut nodes, _tasks, _) = setup_engine(config, 2, chain_spec.clone(), false, false).await.unwrap(); let _sequencer = nodes.pop().unwrap(); From ed0f4db50ce736e33f4839730c03d265effbb3b3 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 27 Aug 2025 10:37:43 +0800 Subject: [PATCH 09/12] improve test --- crates/node/tests/e2e.rs | 66 ++++++++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 9 deletions(-) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 4e41c0ba..05be97f3 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1068,18 +1068,29 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Assert that the follower node has received the new block from the sequencer node. wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; - // TODO: use eventually instead of sleep - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - - // TODO: wait for L2ChainCommitted? + // Assert ChainOrchestrator finished processing block. + wait_for_chain_committed_5s(&mut node0_rnm_events, 11, true).await?; + wait_for_chain_committed_5s(&mut node1_rnm_events, 11, true).await?; // Assert both nodes are at block 11. + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block on node0", + || async { latest_block(&node0).await.unwrap().header.number == 11 }, + ) + .await; let node0_latest_block = latest_block(&node0).await?; - assert_eq!(node0_latest_block.header.number, 11); - assert_eq!( - node0_latest_block.header.hash_slow(), - latest_block(&node1).await?.header.hash_slow() - ); + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block on node1", + || async { + node0_latest_block.header.hash_slow() == + latest_block(&node1).await.unwrap().header.hash_slow() + }, + ) + .await; // Assert that block 11 has a different hash after the reorg. assert_ne!(block11_before_reorg.unwrap(), node0_latest_block.header.hash_slow()); @@ -1325,6 +1336,43 @@ async fn wait_for_block_imported_5s( wait_for_block_imported(events, block_number, Duration::from_secs(5)).await } +async fn wait_for_chain_committed_5s( + events: &mut EventStream, + expected_block_number: u64, + expected_consolidated: bool, +) -> eyre::Result<()> { + wait_for_chain_committed( + events, + expected_block_number, + expected_consolidated, + Duration::from_secs(5), + ) + .await +} + +async fn wait_for_chain_committed( + events: &mut EventStream, + expected_block_number: u64, + expected_consolidated: bool, + timeout: Duration, +) -> eyre::Result<()> { + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::ChainOrchestratorEvent(c) = e { + if let ChainOrchestratorEvent::L2ChainCommitted(block_info, _, consolidated) = c { + return block_info.block_info.number == expected_block_number && + expected_consolidated == consolidated; + } + } + + false + }, + timeout, + ) + .await +} + async fn wait_for_event_predicate( event_stream: &mut EventStream, mut predicate: impl FnMut(RollupManagerEvent) -> bool, From c3dddb051892ccc12187d35eba8ae8956b660deb Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:13:54 +0800 Subject: [PATCH 10/12] fixes after merge --- crates/derivation-pipeline/src/lib.rs | 4 ++-- crates/node/tests/e2e.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index d8d63b66..0ebf6f59 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -24,8 +24,8 @@ use core::{ }; use futures::{FutureExt, Stream}; use rollup_node_primitives::{ - BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithFinalizedBatchInfo, - WithFinalizedBlockNumber, + BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, + WithFinalizedBatchInfo, WithFinalizedBlockNumber, }; use rollup_node_providers::{BlockDataProvider, L1Provider}; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 8ffddae5..bf7fd574 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1402,11 +1402,12 @@ async fn wait_for_chain_committed( wait_for_event_predicate( events, |e| { - if let RollupManagerEvent::ChainOrchestratorEvent(c) = e { - if let ChainOrchestratorEvent::L2ChainCommitted(block_info, _, consolidated) = c { - return block_info.block_info.number == expected_block_number && - expected_consolidated == consolidated; - } + if let RollupManagerEvent::ChainOrchestratorEvent( + ChainOrchestratorEvent::L2ChainCommitted(block_info, _, consolidated), + ) = e + { + return block_info.block_info.number == expected_block_number && + expected_consolidated == consolidated; } false From d41332176486c2a3ad2f0468ebaf1c25e8289db1 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 27 Aug 2025 18:09:57 +0800 Subject: [PATCH 11/12] improve test helper functions --- crates/node/tests/e2e.rs | 65 ++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index bf7fd574..e202318f 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1021,7 +1021,8 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { // Let the sequencer build 10 blocks before performing the reorg process. for i in 1..=10 { node0_rnm_handle.build_block().await; - wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; + let b = wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; + println!("Sequenced block {} {:?}", b.header.number, b.header.hash_slow()); } // Assert that the follower node has received all 10 blocks from the sequencer node. @@ -1099,10 +1100,6 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { node1_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::Reorg(9)).await?; - // TODO: this can only become true if we do https://github.com/scroll-tech/rollup-node/issues/254 - // assert_eq!(latest_block(&node0).await?.header.number, 10); - // assert_eq!(latest_block(&node1).await?.header.number, 10); - // Since the L1 reorg reverted the L1 message included in block 11, the sequencer // should produce a new block at height 11. node0_rnm_handle.build_block().await; @@ -1116,24 +1113,9 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { wait_for_chain_committed_5s(&mut node1_rnm_events, 11, true).await?; // Assert both nodes are at block 11. - eventually( - Duration::from_secs(5), - Duration::from_millis(100), - "Waiting for latest block on node0", - || async { latest_block(&node0).await.unwrap().header.number == 11 }, - ) - .await; + assert_latest_block_on_rpc_by_number(&node0, 11).await; let node0_latest_block = latest_block(&node0).await?; - eventually( - Duration::from_secs(5), - Duration::from_millis(100), - "Waiting for latest block on node1", - || async { - node0_latest_block.header.hash_slow() == - latest_block(&node1).await.unwrap().header.hash_slow() - }, - ) - .await; + assert_latest_block_on_rpc_by_hash(&node1, node0_latest_block.header.hash_slow()).await; // Assert that block 11 has a different hash after the reorg. assert_ne!(block11_before_reorg.unwrap(), node0_latest_block.header.hash_slow()); @@ -1500,3 +1482,42 @@ where interval.tick().await; } } + +async fn assert_latest_block_on_rpc_by_number( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, + block_number: u64, +) { + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block by number on node", + || async { + println!( + "Latest block number: {}, hash: {}", + latest_block(node).await.unwrap().header.number, + latest_block(node).await.unwrap().header.hash_slow() + ); + latest_block(node).await.unwrap().header.number == block_number + }, + ) + .await; +} + +async fn assert_latest_block_on_rpc_by_hash( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, + block_hash: B256, +) { + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block by hash on node", + || async { latest_block(node).await.unwrap().header.hash_slow() == block_hash }, + ) + .await; +} From a0bbea3a75b9fe1bb13b4718e53cf216d74b823e Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Wed, 27 Aug 2025 13:29:21 +0200 Subject: [PATCH 12/12] feat: answer comments --- crates/engine/src/driver.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 0d05dfe9..8f6dc0de 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -129,12 +129,22 @@ where reorged_unsafe_head: Option, reorged_safe_head: Option, ) { - // On an unsafe head reorg: clear the payload building future, reset the unsafe head and - // drop the engine future if it's a `NewPayload` or `BlockImport` with block number > L2 - // reorged number. + // On an unsafe head reorg. if let Some(l2_head_block_info) = reorged_unsafe_head { + // clear the payload building future. self.payload_building_future = None; + + // retain only blocks from chain imports for which the block number <= L2 reorged + // number. + for chain_import in &mut self.chain_imports { + chain_import.chain.retain(|block| block.number <= l2_head_block_info.number); + } + + // reset the unsafe head. self.set_head_block_info(l2_head_block_info); + + // drop the engine future if it's a `NewPayload` or `BlockImport` with block number > + // L2 reorged number. if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() { match fut { EngineFuture::ChainImport(WithBlockNumber { number, .. })