diff --git a/Cargo.lock b/Cargo.lock index c15f5a48..a397443a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2596,6 +2596,33 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "color-eyre" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5920befb47832a6d61ee3a3a846565cfa39b331331e68a3b1d1116630f2f26d" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b88ea9df13354b55bc7234ebcce36e6ef896aca2e42a15de9e10edce01b427" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -6106,6 +6133,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dd4f4a2c8405440fd0462561f0e5806bd0f77e86f51c761481bdd4018b545e" + [[package]] name = "p256" version = "0.13.2" @@ -10337,6 +10370,7 @@ dependencies = [ "alloy-provider", "alloy-rpc-client", "alloy-rpc-types-engine 1.0.24", + "alloy-rpc-types-eth", "alloy-signer", "alloy-signer-aws", "alloy-signer-local", @@ -10345,6 +10379,7 @@ dependencies = [ "aws-config", "aws-sdk-kms", "clap", + "color-eyre", "eyre", "futures", "reqwest", @@ -10392,6 +10427,7 @@ dependencies = [ "scroll-alloy-hardforks", "scroll-alloy-network", "scroll-alloy-provider", + "scroll-alloy-rpc-types", "scroll-alloy-rpc-types-engine", "scroll-db", "scroll-derivation-pipeline", @@ -12731,6 +12767,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db" +dependencies = [ + "tracing", + "tracing-subscriber 0.3.19", +] + [[package]] name = "tracing-futures" version = "0.2.5" diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 1e1fbf56..55ccf0f5 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -586,7 +586,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { /// /// It can either be an index, which is the queue index of the first message to return, or a hash, /// which is the hash of the first message to return. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum L1MessageStart { /// Start from the provided queue index. Index(u64), diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 6372ef04..0ebf6f59 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -24,8 +24,8 @@ use core::{ }; use futures::{FutureExt, Stream}; use rollup_node_primitives::{ - BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithFinalizedBatchInfo, - WithFinalizedBlockNumber, + BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, + WithFinalizedBatchInfo, WithFinalizedBlockNumber, }; use rollup_node_providers::{BlockDataProvider, L1Provider}; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; @@ -186,7 +186,7 @@ impl

Stream for DerivationPipeline

where P: L1Provider + Clone + Unpin + Send + Sync + 'static, { - type Item = ScrollPayloadAttributesWithBatchInfo; + type Item = WithBlockNumber; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -198,7 +198,7 @@ where // return attributes from the queue if any. if let Some(attribute) = this.attributes_queue.pop_front() { - return Poll::Ready(Some(attribute.inner)) + return Poll::Ready(Some(attribute)) } // if future is None and the batch queue is empty, store the waker and return. @@ -487,8 +487,10 @@ mod tests { // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); - while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) = - pipeline.next().await + while let Some(WithBlockNumber { + inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }, + .. + }) = pipeline.next().await { if a.payload_attributes.timestamp == 1696935657 { attribute = a; @@ -545,8 +547,10 @@ mod tests { // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); - while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) = - pipeline.next().await + while let Some(WithBlockNumber { + inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }, + .. + }) = pipeline.next().await { if a.payload_attributes.timestamp == 1696935657 { attribute = a; diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 73f4526a..8f6dc0de 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -8,7 +8,7 @@ use crate::{ use alloy_provider::Provider; use futures::{ready, task::AtomicWaker, FutureExt, Stream}; use rollup_node_primitives::{ - BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, + BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, }; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; @@ -38,7 +38,7 @@ pub struct EngineDriver { /// Block building duration. block_building_duration: Duration, /// The pending payload attributes derived from batches on L1. - l1_payload_attributes: VecDeque, + l1_payload_attributes: VecDeque>, /// The pending block imports received over the network. chain_imports: VecDeque, /// The latest optimistic sync target. @@ -121,6 +121,67 @@ where } } + /// Handle L1 reorg, with the L1 block number reorged to, and whether this reorged the head or + /// batches. + pub fn handle_l1_reorg( + &mut self, + l1_block_number: u64, + reorged_unsafe_head: Option, + reorged_safe_head: Option, + ) { + // On an unsafe head reorg. + if let Some(l2_head_block_info) = reorged_unsafe_head { + // clear the payload building future. + self.payload_building_future = None; + + // retain only blocks from chain imports for which the block number <= L2 reorged + // number. + for chain_import in &mut self.chain_imports { + chain_import.chain.retain(|block| block.number <= l2_head_block_info.number); + } + + // reset the unsafe head. + self.set_head_block_info(l2_head_block_info); + + // drop the engine future if it's a `NewPayload` or `BlockImport` with block number > + // L2 reorged number. + if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() { + match fut { + EngineFuture::ChainImport(WithBlockNumber { number, .. }) + if number > &l2_head_block_info.number => + { + self.engine_future = None + } + // `NewPayload` future is ONLY instantiated when the payload building future is + // done, and we want to issue the payload to the EN. Thus, we also clear it on a + // L2 reorg. + EngineFuture::NewPayload(_) => self.engine_future = None, + _ => {} + } + } + } + + // On a safe head reorg: reset the safe head. + if let Some(safe_block_info) = reorged_safe_head { + self.set_safe_block_info(safe_block_info); + } + + // drop the engine future if it's a `L1Consolidation` future associated with a L1 block + // number > l1_block_number. + if matches!( + self.engine_future.as_ref(), + Some(MeteredFuture { + fut: EngineFuture::L1Consolidation(WithBlockNumber { number, .. }), + .. + }) if number > &l1_block_number + ) { + self.engine_future = None; + } + + // retain the L1 payload attributes with block number <= L1 block. + self.l1_payload_attributes.retain(|attribute| attribute.number <= l1_block_number); + } + /// Handles a block import request by adding it to the queue and waking up the driver. pub fn handle_chain_import(&mut self, chain_import: ChainImport) { tracing::trace!(target: "scroll::engine", head = %chain_import.chain.last().unwrap().hash_slow(), "new chain import request received"); @@ -145,7 +206,10 @@ where /// Handles a [`ScrollPayloadAttributes`] sourced from L1 by initiating a task sending the /// attribute to the EN via the [`EngineDriver`]. - pub fn handle_l1_consolidation(&mut self, attributes: ScrollPayloadAttributesWithBatchInfo) { + pub fn handle_l1_consolidation( + &mut self, + attributes: WithBlockNumber, + ) { self.l1_payload_attributes.push_back(attributes); self.waker.wake(); } @@ -193,7 +257,7 @@ where self.metrics.block_import_duration.record(duration.as_secs_f64()); // Return the block import outcome - return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome) + return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to import block"); @@ -223,7 +287,7 @@ where // record the metric. self.metrics.l1_consolidation_duration.record(duration.as_secs_f64()); - return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)) + return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1"); @@ -250,7 +314,7 @@ where self.metrics.build_new_payload_duration.record(duration.as_secs_f64()); self.metrics.gas_per_block.record(block.gas_used as f64); - return Some(EngineDriverEvent::NewPayload(block)) + return Some(EngineDriverEvent::NewPayload(block)); } Err(err) => { tracing::error!(target: "scroll::engine", ?err, "failed to build new payload"); diff --git a/crates/engine/src/error.rs b/crates/engine/src/error.rs index 72e4cca8..40d6c79e 100644 --- a/crates/engine/src/error.rs +++ b/crates/engine/src/error.rs @@ -1,5 +1,5 @@ use alloy_rpc_types_engine::PayloadError; -use rollup_node_primitives::ScrollPayloadAttributesWithBatchInfo; +use rollup_node_primitives::{ScrollPayloadAttributesWithBatchInfo, WithBlockNumber}; use scroll_alloy_provider::ScrollEngineApiError; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; @@ -21,7 +21,7 @@ pub enum EngineDriverError { /// The payload id field is missing in the forkchoice update response for an L1 consolidation /// job. #[error("Forkchoice update response missing payload id for L1 consolidation job")] - L1ConsolidationMissingPayloadId(ScrollPayloadAttributesWithBatchInfo), + L1ConsolidationMissingPayloadId(WithBlockNumber), /// The payload id field is missing in the forkchoice update response for a payload building /// job. #[error("Forkchoice update response missing payload id for payload building job")] diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 096e40ed..c3825b29 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -11,7 +11,7 @@ use reth_scroll_engine_primitives::try_into_block; use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{ BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, MeteredFuture, - ScrollPayloadAttributesWithBatchInfo, + ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, }; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; @@ -47,7 +47,7 @@ type L1ConsolidationFuture = Pin> + Send>>; /// An enum that represents the different outcomes of an L1 consolidation job. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum ConsolidationOutcome { /// Represents a successful consolidation outcome with the consolidated block info and batch /// info. @@ -97,8 +97,8 @@ pub(crate) type OptimisticSyncFuture = /// An enum that represents the different types of futures that can be executed on the engine API. /// It can be a block import job, an L1 consolidation job, or a new payload processing. pub(crate) enum EngineFuture { - ChainImport(ChainImportFuture), - L1Consolidation(L1ConsolidationFuture), + ChainImport(WithBlockNumber), + L1Consolidation(WithBlockNumber), NewPayload(NewPayloadFuture), OptimisticSync(OptimisticSyncFuture), } @@ -112,7 +112,11 @@ impl EngineFuture { where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, { - Self::ChainImport(Box::pin(handle_chain_import(client, chain_import, fcs))) + let highest_block_number = chain_import.chain.last().unwrap().number; + Self::ChainImport(WithBlockNumber::new( + highest_block_number, + Box::pin(handle_chain_import(client, chain_import, fcs)), + )) } pub(crate) fn optimistic_sync(client: Arc, fcs: AlloyForkchoiceState) -> Self @@ -127,18 +131,21 @@ impl EngineFuture { client: Arc, execution_payload_provider: P, fcs: ForkchoiceState, - payload_attributes: ScrollPayloadAttributesWithBatchInfo, + payload_attributes: WithBlockNumber, ) -> Self where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, P: Provider + Unpin + Send + Sync + 'static, { - Self::L1Consolidation(Box::pin(handle_payload_attributes( - client, - execution_payload_provider, - fcs, - payload_attributes, - ))) + Self::L1Consolidation(WithBlockNumber::new( + payload_attributes.number, + Box::pin(handle_payload_attributes( + client, + execution_payload_provider, + fcs, + payload_attributes, + )), + )) } /// Creates a new [`EngineFuture::NewPayload`] future from the provided parameters. @@ -162,8 +169,8 @@ impl Future for EngineFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { - Self::ChainImport(fut) => fut.as_mut().poll(cx).map(Into::into), - Self::L1Consolidation(fut) => fut.as_mut().poll(cx).map(Into::into), + Self::ChainImport(fut) => fut.inner.as_mut().poll(cx).map(Into::into), + Self::L1Consolidation(fut) => fut.inner.as_mut().poll(cx).map(Into::into), Self::NewPayload(fut) => fut.as_mut().poll(cx).map(Into::into), Self::OptimisticSync(fut) => fut.as_mut().poll(cx).map(Into::into), } @@ -258,7 +265,7 @@ async fn handle_payload_attributes( client: Arc, provider: P, fcs: ForkchoiceState, - payload_attributes_with_batch_info: ScrollPayloadAttributesWithBatchInfo, + payload_attributes_with_batch_info: WithBlockNumber, ) -> Result where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, @@ -267,7 +274,7 @@ where tracing::trace!(target: "scroll::engine::future", ?fcs, ?payload_attributes_with_batch_info, "handling payload attributes"); let ScrollPayloadAttributesWithBatchInfo { mut payload_attributes, batch_info } = - payload_attributes_with_batch_info.clone(); + payload_attributes_with_batch_info.inner.clone(); let maybe_execution_payload = provider .get_block((fcs.safe_block_info().number + 1).into()) diff --git a/crates/manager/src/manager/event.rs b/crates/manager/src/manager/event.rs index 6e18c676..dfa970d8 100644 --- a/crates/manager/src/manager/event.rs +++ b/crates/manager/src/manager/event.rs @@ -8,7 +8,7 @@ use scroll_engine::ConsolidationOutcome; use scroll_network::NewBlockWithPeer; /// An event that can be emitted by the rollup node manager. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum RollupManagerEvent { /// A new block has been received from the network. NewBlockReceived(NewBlockWithPeer), diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 2c6b1b5c..f20c7a24 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -290,15 +290,12 @@ where l2_head_block_info, l2_safe_block_info, } => { - // Update the [`EngineDriver`] fork choice state with the new L2 head info. - if let Some(l2_head_block_info) = l2_head_block_info { - self.engine.set_head_block_info(l2_head_block_info); - } - - // Update the [`EngineDriver`] fork choice state with the new L2 safe info. - if let Some(safe_block_info) = l2_safe_block_info { - self.engine.set_safe_block_info(safe_block_info); - } + // Handle the reorg in the engine driver. + self.engine.handle_l1_reorg( + l1_block_number, + l2_head_block_info, + l2_safe_block_info, + ); // Update the [`Sequencer`] with the new L1 head info and queue index. if let Some(sequencer) = self.sequencer.as_mut() { diff --git a/crates/network/src/event.rs b/crates/network/src/event.rs index ae81484a..20695b66 100644 --- a/crates/network/src/event.rs +++ b/crates/network/src/event.rs @@ -3,7 +3,7 @@ use reth_network_api::PeerId; use reth_scroll_primitives::ScrollBlock; /// A new block with the peer id that it was received from. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct NewBlockWithPeer { pub peer_id: PeerId, pub block: ScrollBlock, diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 0b18b9c3..9cdf45c0 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -77,6 +77,7 @@ reth-provider = { workspace = true, optional = true } reth-rpc-server-types = { workspace = true, optional = true } scroll-alloy-rpc-types-engine = { workspace = true, optional = true } scroll-derivation-pipeline = { workspace = true, optional = true } +scroll-alloy-rpc-types = { git = "https://github.com/scroll-tech/reth.git", default-features = false } scroll-db.workspace = true scroll-engine.workspace = true @@ -107,6 +108,8 @@ reth-tracing.workspace = true rollup-node = { workspace = true, features = ["test-utils"] } scroll-alloy-rpc-types-engine.workspace = true serde_json = { version = "1.0.94", default-features = false, features = ["alloc"] } +color-eyre = "0.6" +alloy-rpc-types-eth = { workspace = true } [features] test-utils = [ diff --git a/crates/node/src/test_utils.rs b/crates/node/src/test_utils.rs index 5d1519de..153e47c2 100644 --- a/crates/node/src/test_utils.rs +++ b/crates/node/src/test_utils.rs @@ -161,6 +161,13 @@ pub fn default_test_scroll_rollup_node_config() -> ScrollRollupNodeConfig { } /// Returns a default [`ScrollRollupNodeConfig`] preconfigured for testing with sequencer. +/// It sets `sequencer_args.block_time = 0` so that no blocks are produced automatically. +/// To produce blocks the `build_block` method needs to be invoked. +/// This is so that block production and test scenarios remain predictable. +/// +/// In case this behavior is not wanted, `block_time` can be adjusted to any value > 0 after +/// obtaining the config so that the sequencer node will produce blocks automatically in this +/// interval. pub fn default_sequencer_test_scroll_rollup_node_config() -> ScrollRollupNodeConfig { ScrollRollupNodeConfig { test: true, @@ -174,7 +181,7 @@ pub fn default_sequencer_test_scroll_rollup_node_config() -> ScrollRollupNodeCon }, sequencer_args: SequencerArgs { sequencer_enabled: true, - block_time: 50, + block_time: 0, payload_building_duration: 40, fee_recipient: Default::default(), l1_message_inclusion_mode: L1MessageInclusionMode::BlockDepth(0), diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 78a798ef..e202318f 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -2,12 +2,17 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; +use alloy_rpc_types_eth::Block; use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; +use eyre::Ok; use futures::StreamExt; use reth_chainspec::EthChainSpec; +use reth_e2e_test_utils::{NodeHelperType, TmpDB}; use reth_network::{NetworkConfigBuilder, NetworkEventListenerProvider, Peers, PeersInfo}; use reth_network_api::block::EthWireProvider; +use reth_node_api::NodeTypesWithDBAdapter; +use reth_provider::providers::BlockchainProvider; use reth_rpc_api::EthApiServer; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::ScrollNetworkPrimitives; @@ -21,7 +26,7 @@ use rollup_node::{ }, BeaconProviderArgs, ChainOrchestratorArgs, ConsensusAlgorithm, ConsensusArgs, DatabaseArgs, EngineDriverArgs, GasPriceOracleArgs, L1ProviderArgs, NetworkArgs as ScrollNetworkArgs, - RollupNodeContext, ScrollRollupNodeConfig, SequencerArgs, + RollupNodeContext, ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, }; use rollup_node_chain_orchestrator::ChainOrchestratorEvent; use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent}; @@ -30,6 +35,7 @@ use rollup_node_providers::BlobSource; use rollup_node_sequencer::L1MessageInclusionMode; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; +use scroll_alloy_rpc_types::Transaction as ScrollAlloyTransaction; use scroll_network::{NewBlockWithPeer, SCROLL_MAINNET}; use scroll_wire::{ScrollWireConfig, ScrollWireProtocolHandler}; use std::{path::PathBuf, sync::Arc, time::Duration}; @@ -299,25 +305,6 @@ async fn can_penalize_peer_for_invalid_block() { .await; } -/// Helper function to wait until a predicate is true or a timeout occurs. -pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) -where - F: FnMut() -> Fut, - Fut: std::future::Future, -{ - let mut interval = time::interval(tick); - let start = time::Instant::now(); - loop { - if predicate().await { - return; - } - - assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); - - interval.tick().await; - } -} - #[allow(clippy::large_stack_frames)] #[tokio::test] async fn can_forward_tx_to_sequencer() { @@ -1011,94 +998,127 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { #[allow(clippy::large_stack_frames)] #[tokio::test] -async fn can_handle_reorgs_while_sequencing() -> eyre::Result<()> { +async fn can_handle_l1_message_reorg() -> eyre::Result<()> { reth_tracing::init_test_tracing(); + color_eyre::install()?; let chain_spec = (*SCROLL_DEV).clone(); - // Launch a node - let mut config = default_sequencer_test_scroll_rollup_node_config(); - config.sequencer_args.block_time = 0; - config.beacon_provider_args.url = Some( - "http://dummy:8545" - .parse() - .expect("valid url that will not be used as test batches use calldata"), - ); - config.engine_driver_args.sync_at_startup = false; - let (mut nodes, _tasks, _) = setup_engine(config, 1, chain_spec.clone(), false, false).await?; - let node = nodes.pop().unwrap(); + // Launch 2 nodes: node0=sequencer and node1=follower. + let config = default_sequencer_test_scroll_rollup_node_config(); + let (mut nodes, _tasks, _) = setup_engine(config, 2, chain_spec.clone(), false, false).await?; + let node0 = nodes.remove(0); + let node1 = nodes.remove(0); - let handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); - let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); - let mut rnm_events = handle.get_event_listener().await?; + // Get handles + let node0_rnm_handle = node0.inner.add_ons_handle.rollup_manager_handle.clone(); + let mut node0_rnm_events = node0_rnm_handle.get_event_listener().await?; + let node0_l1_watcher_tx = node0.inner.add_ons_handle.l1_watcher_tx.as_ref().unwrap(); - // Send an L1 message. - let message = TxL1Message { - queue_index: 0, - gas_limit: 21000, - to: Default::default(), - value: Default::default(), - sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), - input: Default::default(), - }; + let node1_rnm_handle = node1.inner.add_ons_handle.rollup_manager_handle.clone(); + let mut node1_rnm_events = node1_rnm_handle.get_event_listener().await?; + let node1_l1_watcher_tx = node1.inner.add_ons_handle.l1_watcher_tx.as_ref().unwrap(); // Let the sequencer build 10 blocks before performing the reorg process. - handle.build_block().await; - let mut i = 0; - loop { - if let Some(RollupManagerEvent::BlockSequenced(_)) = rnm_events.next().await { - i += 1; - if i == 10 { - break - } - handle.build_block().await; - } + for i in 1..=10 { + node0_rnm_handle.build_block().await; + let b = wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; + println!("Sequenced block {} {:?}", b.header.number, b.header.hash_slow()); } + // Assert that the follower node has received all 10 blocks from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; + // Send a L1 message and wait for it to be indexed. - l1_watcher_tx - .send(Arc::new(L1Notification::L1Message { message, block_number: 10, block_timestamp: 0 })) - .await?; - loop { - if let Some(RollupManagerEvent::ChainOrchestratorEvent( - ChainOrchestratorEvent::L1MessageCommitted(index), - )) = rnm_events.next().await - { - assert_eq!(index, 0); - break - } - } - l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + let l1_message_notification = L1Notification::L1Message { + message: TxL1Message { + queue_index: 0, + gas_limit: 21000, + to: Default::default(), + value: Default::default(), + sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), + input: Default::default(), + }, + block_number: 10, + block_timestamp: 0, + }; - // Wait for block that contains the L1 message. - handle.build_block().await; - let l2_reorged_height; - loop { - if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { - if block.body.transactions.iter().any(|tx| tx.is_l1_message()) { - l2_reorged_height = block.header.number; - break + // Send the L1 message to the sequencer node. + node0_l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; + node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + wait_for_event_5s( + &mut node0_rnm_events, + RollupManagerEvent::ChainOrchestratorEvent(ChainOrchestratorEvent::L1MessageCommitted(0)), + ) + .await?; + + // Send L1 the L1 message to follower node. + node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + wait_for_event_5s( + &mut node1_rnm_events, + RollupManagerEvent::ChainOrchestratorEvent(ChainOrchestratorEvent::L1MessageCommitted(0)), + ) + .await?; + + // Build block that contains the L1 message. + let mut block11_before_reorg = None; + node0_rnm_handle.build_block().await; + wait_for_event_predicate_5s(&mut node0_rnm_events, |e| { + if let RollupManagerEvent::BlockSequenced(block) = e { + if block.header.number == 11 && + block.body.transactions.len() == 1 && + block.body.transactions.iter().any(|tx| tx.is_l1_message()) + { + block11_before_reorg = Some(block.header.hash_slow()); + return true; } - handle.build_block().await; } + + false + }) + .await?; + + for i in 12..=15 { + node0_rnm_handle.build_block().await; + wait_for_block_sequenced_5s(&mut node0_rnm_events, i).await?; } + // Assert that the follower node has received the latest block from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 15).await?; + + // Assert both nodes are at block 15. + let node0_latest_block = latest_block(&node0).await?; + assert_eq!(node0_latest_block.header.number, 15); + assert_eq!( + node0_latest_block.header.hash_slow(), + latest_block(&node1).await?.header.hash_slow() + ); + // Issue and wait for the reorg. - l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; - loop { - if let Some(RollupManagerEvent::Reorg(height)) = rnm_events.next().await { - assert_eq!(height, 9); - break - } - } + node0_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; + wait_for_event_5s(&mut node0_rnm_events, RollupManagerEvent::Reorg(9)).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(9))).await?; + wait_for_event_5s(&mut node1_rnm_events, RollupManagerEvent::Reorg(9)).await?; - // Get the next sequenced L2 block. - handle.build_block().await; - loop { - if let Some(RollupManagerEvent::BlockSequenced(block)) = rnm_events.next().await { - assert_eq!(block.number, l2_reorged_height); - break - } - } + // Since the L1 reorg reverted the L1 message included in block 11, the sequencer + // should produce a new block at height 11. + node0_rnm_handle.build_block().await; + wait_for_block_sequenced_5s(&mut node0_rnm_events, 11).await?; + + // Assert that the follower node has received the new block from the sequencer node. + wait_for_block_imported_5s(&mut node1_rnm_events, 11).await?; + + // Assert ChainOrchestrator finished processing block. + wait_for_chain_committed_5s(&mut node0_rnm_events, 11, true).await?; + wait_for_chain_committed_5s(&mut node1_rnm_events, 11, true).await?; + + // Assert both nodes are at block 11. + assert_latest_block_on_rpc_by_number(&node0, 11).await; + let node0_latest_block = latest_block(&node0).await?; + assert_latest_block_on_rpc_by_hash(&node1, node0_latest_block.header.hash_slow()).await; + + // Assert that block 11 has a different hash after the reorg. + assert_ne!(block11_before_reorg.unwrap(), node0_latest_block.header.hash_slow()); Ok(()) } @@ -1110,8 +1130,10 @@ async fn can_gossip_over_eth_wire() -> eyre::Result<()> { // Create the chain spec for scroll dev with Feynman activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); + let mut config = default_sequencer_test_scroll_rollup_node_config(); + config.sequencer_args.block_time = 40; + // Setup the rollup node manager. - let config = default_sequencer_test_scroll_rollup_node_config(); let (mut nodes, _tasks, _) = setup_engine(config, 2, chain_spec.clone(), false, false).await.unwrap(); let _sequencer = nodes.pop().unwrap(); @@ -1148,6 +1170,7 @@ async fn signer_rotation() -> eyre::Result<()> { sequencer_1_config.consensus_args.algorithm = ConsensusAlgorithm::SystemContract; sequencer_1_config.consensus_args.authorized_signer = Some(signer_1_address); sequencer_1_config.signer_args.private_key = Some(signer_1); + sequencer_1_config.sequencer_args.block_time = 40; sequencer_1_config.network_args.enable_eth_scroll_wire_bridge = false; let mut sequencer_2_config = default_sequencer_test_scroll_rollup_node_config(); @@ -1155,6 +1178,7 @@ async fn signer_rotation() -> eyre::Result<()> { sequencer_2_config.consensus_args.algorithm = ConsensusAlgorithm::SystemContract; sequencer_2_config.consensus_args.authorized_signer = Some(signer_1_address); sequencer_2_config.signer_args.private_key = Some(signer_2); + sequencer_2_config.sequencer_args.block_time = 40; sequencer_2_config.network_args.enable_eth_scroll_wire_bridge = false; // Setup two sequencer nodes. @@ -1257,12 +1281,179 @@ pub fn read_to_bytes>(path: P) -> eyre::Result Ok(Bytes::from_str(&std::fs::read_to_string(path)?)?) } +async fn latest_block( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, +) -> eyre::Result> { + node.rpc + .inner + .eth_api() + .block_by_number(BlockNumberOrTag::Latest, false) + .await? + .ok_or_else(|| eyre::eyre!("Latest block not found")) +} + +async fn wait_for_block_sequenced( + events: &mut EventStream, + block_number: u64, + timeout: Duration, +) -> eyre::Result { + let mut block = None; + + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::BlockSequenced(b) = e { + if b.header.number == block_number { + block = Some(b); + return true; + } + } + + false + }, + timeout, + ) + .await?; + + block.ok_or_else(|| eyre::eyre!("Block with number {block_number} was not sequenced")) +} + +async fn wait_for_block_sequenced_5s( + events: &mut EventStream, + block_number: u64, +) -> eyre::Result { + wait_for_block_sequenced(events, block_number, Duration::from_secs(5)).await +} + +async fn wait_for_block_imported( + events: &mut EventStream, + block_number: u64, + timeout: Duration, +) -> eyre::Result { + let mut block = None; + + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::BlockImported(b) = e { + if b.header.number == block_number { + block = Some(b); + return true; + } + } + + false + }, + timeout, + ) + .await?; + + block.ok_or_else(|| eyre::eyre!("Block with number {block_number} was not imported")) +} + +async fn wait_for_block_imported_5s( + events: &mut EventStream, + block_number: u64, +) -> eyre::Result { + wait_for_block_imported(events, block_number, Duration::from_secs(5)).await +} + +async fn wait_for_chain_committed_5s( + events: &mut EventStream, + expected_block_number: u64, + expected_consolidated: bool, +) -> eyre::Result<()> { + wait_for_chain_committed( + events, + expected_block_number, + expected_consolidated, + Duration::from_secs(5), + ) + .await +} + +async fn wait_for_chain_committed( + events: &mut EventStream, + expected_block_number: u64, + expected_consolidated: bool, + timeout: Duration, +) -> eyre::Result<()> { + wait_for_event_predicate( + events, + |e| { + if let RollupManagerEvent::ChainOrchestratorEvent( + ChainOrchestratorEvent::L2ChainCommitted(block_info, _, consolidated), + ) = e + { + return block_info.block_info.number == expected_block_number && + expected_consolidated == consolidated; + } + + false + }, + timeout, + ) + .await +} + +async fn wait_for_event_predicate( + event_stream: &mut EventStream, + mut predicate: impl FnMut(RollupManagerEvent) -> bool, + timeout: Duration, +) -> eyre::Result<()> { + let sleep = tokio::time::sleep(timeout); + tokio::pin!(sleep); + + loop { + tokio::select! { + maybe_event = event_stream.next() => { + match maybe_event { + Some(e) if predicate(e.clone()) => { + return Ok(()); + } + Some(e) => { + tracing::debug!(target: "TODO:nodeX", "ignoring event {:?}", e); + }, // Ignore other events + None => return Err(eyre::eyre!("Event stream ended unexpectedly")), + } + } + _ = &mut sleep => return Err(eyre::eyre!("Timeout while waiting for event")), + } + } +} + +async fn wait_for_event_predicate_5s( + event_stream: &mut EventStream, + predicate: impl FnMut(RollupManagerEvent) -> bool, +) -> eyre::Result<()> { + wait_for_event_predicate(event_stream, predicate, Duration::from_secs(5)).await +} + +async fn wait_for_event( + event_stream: &mut EventStream, + event: RollupManagerEvent, + timeout: Duration, +) -> eyre::Result<()> { + wait_for_event_predicate(event_stream, |e| e == event, timeout).await +} + +async fn wait_for_event_5s( + event_stream: &mut EventStream, + event: RollupManagerEvent, +) -> eyre::Result<()> { + wait_for_event(event_stream, event, Duration::from_secs(5)).await +} + /// Waits for n events to be emitted. async fn wait_n_events( events: &mut EventStream, - matches: impl Fn(RollupManagerEvent) -> bool, + mut matches: impl FnMut(RollupManagerEvent) -> bool, mut n: u64, ) { + // TODO: refactor using `wait_for_event_predicate` while let Some(event) = events.next().await { if matches(event) { n -= 1; @@ -1272,3 +1463,61 @@ async fn wait_n_events( } } } + +/// Helper function to wait until a predicate is true or a timeout occurs. +pub async fn eventually(timeout: Duration, tick: Duration, message: &str, mut predicate: F) +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let mut interval = time::interval(tick); + let start = time::Instant::now(); + loop { + if predicate().await { + return; + } + + assert!(start.elapsed() <= timeout, "Timeout while waiting for condition: {message}"); + + interval.tick().await; + } +} + +async fn assert_latest_block_on_rpc_by_number( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, + block_number: u64, +) { + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block by number on node", + || async { + println!( + "Latest block number: {}, hash: {}", + latest_block(node).await.unwrap().header.number, + latest_block(node).await.unwrap().header.hash_slow() + ); + latest_block(node).await.unwrap().header.number == block_number + }, + ) + .await; +} + +async fn assert_latest_block_on_rpc_by_hash( + node: &NodeHelperType< + ScrollRollupNode, + BlockchainProvider>, + >, + block_hash: B256, +) { + eventually( + Duration::from_secs(5), + Duration::from_millis(100), + "Waiting for latest block by hash on node", + || async { latest_block(node).await.unwrap().header.hash_slow() == block_hash }, + ) + .await; +} diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index f7b2487d..cae5a84f 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -97,7 +97,8 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { async fn test_should_trigger_pipeline_sync_for_execution_node() -> eyre::Result<()> { reth_tracing::init_test_tracing(); let node_config = default_test_scroll_rollup_node_config(); - let sequencer_node_config = default_sequencer_test_scroll_rollup_node_config(); + let mut sequencer_node_config = default_sequencer_test_scroll_rollup_node_config(); + sequencer_node_config.sequencer_args.block_time = 40; // Create the chain spec for scroll mainnet with Feynman activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); diff --git a/crates/signer/src/event.rs b/crates/signer/src/event.rs index 5d28886d..83324429 100644 --- a/crates/signer/src/event.rs +++ b/crates/signer/src/event.rs @@ -2,7 +2,7 @@ use alloy_signer::Signature; use reth_scroll_primitives::ScrollBlock; /// An enum representing the events that can be emitted by the signer. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum SignerEvent { /// A block has been signed by the signer. SignedBlock {