diff --git a/lib/src/chain/blocks_tree.rs b/lib/src/chain/blocks_tree.rs index 62d5366fd9..35682bbd0e 100644 --- a/lib/src/chain/blocks_tree.rs +++ b/lib/src/chain/blocks_tree.rs @@ -234,6 +234,39 @@ impl NonFinalizedTree { self.blocks_trigger_gp_change.clear(); } + /// Updates the consensus algorithm of the finalized block. + /// + /// This is useful when the chain starts with `Unknown` consensus and later + /// discovers the actual consensus parameters (e.g. Aura authorities). + /// + /// Non-finalized blocks that were verified under the old consensus are cleared, + /// as they may have been verified with incorrect rules. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + self.finalized_consensus = match consensus { + chain_information::ChainInformationConsensus::Unknown => FinalizedConsensus::Unknown, + chain_information::ChainInformationConsensus::Aura { + finalized_authorities_list, + slot_duration, + } => FinalizedConsensus::Aura { + authorities_list: Arc::new(finalized_authorities_list), + slot_duration, + }, + chain_information::ChainInformationConsensus::Babe { + finalized_block_epoch_information, + finalized_next_epoch_transition, + slots_per_epoch, + } => FinalizedConsensus::Babe { + slots_per_epoch, + block_epoch_information: finalized_block_epoch_information.map(Arc::from), + next_epoch_transition: Arc::from(finalized_next_epoch_transition), + }, + }; + self.clear(); + } + /// Returns true if there isn't any non-finalized block in the chain. pub fn is_empty(&self) -> bool { self.blocks.is_empty() diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index eb9a128feb..e0a32821c0 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -243,6 +243,20 @@ impl AllSync { all_forks.as_chain_information() } + /// Updates the consensus algorithm of the finalized block in-place. + /// + /// Clears any non-finalized blocks, as they may have been verified with + /// incorrect consensus rules. Peers and requests are unaffected. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + let Some(all_forks) = &mut self.all_forks else { + unreachable!() + }; + all_forks.set_finalized_consensus(consensus); + } + /// Returns the current status of the syncing. pub fn status(&'_ self) -> Status<'_, TSrc> { // TODO: diff --git a/lib/src/sync/all_forks.rs b/lib/src/sync/all_forks.rs index 7629a880bf..d7166d0d8a 100644 --- a/lib/src/sync/all_forks.rs +++ b/lib/src/sync/all_forks.rs @@ -445,6 +445,16 @@ impl AllForksSync { self.chain.as_chain_information() } + /// Updates the consensus algorithm of the finalized block. + /// + /// See [`blocks_tree::NonFinalizedTree::set_finalized_consensus`] for details. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + self.chain.set_finalized_consensus(consensus); + } + /// Returns the header of the finalized block. pub fn finalized_block_header(&self) -> &[u8] { self.chain.finalized_block_header() diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index 9a9dda8272..553fdda5b6 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -68,33 +68,9 @@ pub(super) async fn start_parachain( ) ); - // Phase 2: Download the parachain runtime from a P2P peer and determine Aura - // consensus parameters. Retries indefinitely until successful. - let (effective_chain_info, finalized_runtime) = loop { - match bootstrap_parachain_consensus( - &log_target, - &platform, - &network_service, - &effective_chain_info, - block_number_bytes, - ) - .await - { - Ok(b) => break (b.chain_info, b.finalized_runtime), - Err(err) => { - log!( - &platform, - Warn, - &log_target, - format!("Failed to bootstrap parachain consensus: {err}. Retrying in 5s...") - ); - platform.sleep(Duration::from_secs(5)).await; - } - } - }; - // Phase 3: Spawn the paraheads background service that tracks relay chain // finalization and reports finalized parachain blocks. + // This only needs the Phase 1 result (the finalized header), not Phase 2. let (to_paraheads, from_paraheads) = async_channel::bounded(16); let from_paraheads = Box::pin(from_paraheads); @@ -143,7 +119,8 @@ pub(super) async fn start_parachain( as future::BoxFuture<'static, super::SubscribeAll>) }; - // Phase 4: Create AllSync with Aura consensus from the bootstrapped chain information. + // Phase 4: Create AllSync with Unknown consensus from the Phase 1 chain information. + // Consensus will be upgraded to Aura once bootstrap_future completes. let mut task = Task { sync: Some(all::AllSync::new(all::Config { chain_information: effective_chain_info, @@ -168,7 +145,11 @@ pub(super) async fn start_parachain( paraheads_notifications: None, pending_parachain_finalization: None, network_up_to_date_best: true, - known_finalized_runtime: Some(finalized_runtime), + known_finalized_runtime: None, + bootstrap_future: None, + bootstrap_peer_tx: None, + bootstrap_retry_sleep: None, + block_number_bytes, pending_requests: stream::FuturesUnordered::new(), all_notifications: Vec::>::new(), log_target, @@ -185,6 +166,9 @@ pub(super) async fn start_parachain( platform, }; + // Phase 2 (concurrent): Start bootstrap in the background. + task.start_bootstrap(); + // Phase 5: Main sync loop. loop { // Yield at every loop in order to provide better tasks granularity. @@ -203,6 +187,8 @@ pub(super) async fn start_parachain( ParaheadSubscribed(super::SubscribeAll), ParaheadNotification(super::Notification), ParaheadSubscriptionDead, + BootstrapComplete(Result), + BootstrapRetryReady, } let wake_up_reason = { @@ -257,6 +243,21 @@ pub(super) async fn start_parachain( future::pending().await } }) + .or(async { + if let Some(f) = task.bootstrap_future.as_mut() { + WakeUpReason::BootstrapComplete(f.await) + } else { + future::pending().await + } + }) + .or(async { + if let Some(sleep) = task.bootstrap_retry_sleep.as_mut() { + sleep.await; + WakeUpReason::BootstrapRetryReady + } else { + future::pending().await + } + }) .or({ let sync = &mut task.sync; async move { @@ -370,16 +371,20 @@ pub(super) async fn start_parachain( ?error ); - log!( - &task.platform, - Warn, - &task.log_target, - format!( - "Error while verifying header {}: {}", - HashDisplay(&verified_hash), - error - ) - ); + // Suppress noisy Warn logs during bootstrap — verification failures + // are expected when consensus is still Unknown. + if task.bootstrap_future.is_none() && task.bootstrap_retry_sleep.is_none() { + log!( + &task.platform, + Warn, + &task.log_target, + format!( + "Error while verifying header {}: {}", + HashDisplay(&verified_hash), + error + ) + ); + } } } } @@ -390,6 +395,10 @@ pub(super) async fn start_parachain( best_block_number, best_block_hash, }) => { + // If bootstrap is waiting for a peer, send this one. + if let Some(tx) = task.bootstrap_peer_tx.take() { + let _ = tx.send(peer_id.clone()); + } task.peers_source_id_map.insert( peer_id.clone(), task.sync @@ -1028,6 +1037,48 @@ pub(super) async fn start_parachain( .. }) => {} + WakeUpReason::BootstrapComplete(Ok(bootstrapped)) => { + task.bootstrap_future = None; + + log!( + &task.platform, + Info, + &task.log_target, + "Parachain consensus bootstrapped — upgrading to Aura" + ); + + task.sync + .as_mut() + .unwrap_or_else(|| unreachable!()) + .set_finalized_consensus( + chain::chain_information::ChainInformationConsensus::Aura { + finalized_authorities_list: bootstrapped.aura_authorities, + slot_duration: bootstrapped.aura_slot_duration, + }, + ); + task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); + task.network_up_to_date_best = false; + } + + WakeUpReason::BootstrapComplete(Err(err)) => { + log!( + &task.platform, + Warn, + &task.log_target, + format!("Failed to bootstrap parachain consensus: {err}. Retrying in 5s...") + ); + task.bootstrap_future = None; + let platform = task.platform.clone(); + task.bootstrap_retry_sleep = Some(Box::pin(async move { + platform.sleep(Duration::from_secs(5)).await; + })); + } + + WakeUpReason::BootstrapRetryReady => { + task.bootstrap_retry_sleep = None; + task.start_bootstrap(); + } + // Unreachable variants - parachains don't use warp sync, finality proofs, or Grandpa WakeUpReason::NetworkEvent( network_service::Event::GrandpaNeighborPacket { .. } @@ -1058,6 +1109,21 @@ struct Task { /// If `Some`, contains the runtime of the current finalized block. known_finalized_runtime: Option, + /// Future resolving to the Phase 2 bootstrap result. `None` once bootstrap + /// is complete and consensus has been upgraded to Aura. + bootstrap_future: Option>>, + + /// Oneshot sender to provide a peer ID to the bootstrap future. + /// `None` once a peer has been sent or bootstrap is complete. + bootstrap_peer_tx: Option>, + + /// When bootstrap fails, a retry sleep future. When it resolves, a new + /// `bootstrap_future` is created. + bootstrap_retry_sleep: Option>, + + /// Number of bytes used to encode block numbers in headers. + block_number_bytes: usize, + /// For each networking peer, the index of the corresponding peer within the sync. peers_source_id_map: HashMap, @@ -1090,6 +1156,36 @@ enum RequestOutcome { } impl Task { + fn start_bootstrap(&mut self) { + let log_target = self.log_target.clone(); + let platform = self.platform.clone(); + let network_service = self.network_service.clone(); + let finalized_header_bytes = self + .sync + .as_ref() + .unwrap_or_else(|| unreachable!()) + .finalized_block_header() + .to_vec(); + let block_number_bytes = self.block_number_bytes; + let (tx, rx) = oneshot::channel(); + self.bootstrap_peer_tx = Some(tx); + if let Some(peer_id) = self.peers_source_id_map.keys().next().cloned() { + let _ = self.bootstrap_peer_tx.take().unwrap().send(peer_id); + } + self.bootstrap_future = Some(Box::pin(async move { + let peer_id = rx.await.map_err(|_| String::from("Peer channel closed"))?; + bootstrap_parachain_consensus( + &log_target, + &platform, + &network_service, + peer_id, + &finalized_header_bytes, + block_number_bytes, + ) + .await + })); + } + fn dispatch_all_subscribers(&mut self, notification: Notification) { for index in (0..self.all_notifications.len()).rev() { let subscription = self.all_notifications.swap_remove(index); @@ -1228,59 +1324,38 @@ async fn fetch_parachain_head_from_relay( } struct BootstrappedParachain { - chain_info: chain::chain_information::ValidChainInformation, + aura_authorities: Vec, + aura_slot_duration: NonZero, finalized_runtime: FinalizedBlockRuntime, } /// Downloads the parachain runtime from a P2P peer and determines Aura consensus parameters. +/// +/// The caller provides a connected `peer_id` to use for network requests. async fn bootstrap_parachain_consensus( log_target: &str, platform: &TPlat, network_service: &Arc>, - chain_info: &chain::chain_information::ValidChainInformation, + peer_id: libp2p::PeerId, + finalized_header_bytes: &[u8], block_number_bytes: usize, ) -> Result { - let ci_ref = chain_info.as_ref(); - let state_root = *ci_ref.finalized_block_header.state_root; - let block_hash = ci_ref.finalized_block_header.hash(block_number_bytes); + let decoded_header = header::decode(finalized_header_bytes, block_number_bytes) + .map_err(|e| format!("Failed to decode finalized header: {e}"))?; + let state_root = *decoded_header.state_root; + let block_hash = header::hash_from_scale_encoded_header(finalized_header_bytes); log!( platform, Info, log_target, format!( - "Bootstrapping parachain consensus from block #{} ({})", - ci_ref.finalized_block_header.number, + "Bootstrapping parachain consensus from block #{} ({}) using peer {peer_id}", + decoded_header.number, HashDisplay(&block_hash) ) ); - // Wait for a peer to connect. - let peer_id = { - let mut from_network = Box::pin(network_service.subscribe().await); - - if let Some(peer) = network_service.peers_list().await.next() { - peer - } else { - loop { - match from_network.next().await { - Some(network_service::Event::Connected { peer_id, .. }) => break peer_id, - Some(_) => continue, - None => { - from_network = Box::pin(network_service.subscribe().await); - } - } - } - } - }; - - log!( - platform, - Info, - log_target, - format!("Downloading parachain runtime from peer {peer_id}") - ); - // Download :code and :heappages. let proof = network_service .clone() @@ -1413,20 +1488,9 @@ async fn bootstrap_parachain_consensus( ) ); - let new_chain_info = chain::chain_information::ChainInformation { - finalized_block_header: Box::new(ci_ref.finalized_block_header.into()), - consensus: chain::chain_information::ChainInformationConsensus::Aura { - finalized_authorities_list: authorities, - slot_duration, - }, - finality: chain::chain_information::ChainInformationFinality::Outsourced, - }; - - let chain_info = chain::chain_information::ValidChainInformation::try_from(new_chain_info) - .map_err(|e| format!("Invalid chain information: {e}"))?; - Ok(BootstrappedParachain { - chain_info, + aura_authorities: authorities, + aura_slot_duration: slot_duration, finalized_runtime: FinalizedBlockRuntime { virtual_machine: vm, storage_code: Some(code),