From eabc7871f7735703708a129abd3d5f5c4703b660 Mon Sep 17 00:00:00 2001 From: w Date: Wed, 22 Apr 2026 18:23:17 -0400 Subject: [PATCH 1/3] refactor(sync-service): convert start_parachain to reactive state machine The main sync loop now starts immediately after fetching the parachain head from the relay chain (Phase 1). Runtime bootstrap (Phase 2) runs concurrently as a future inside the event loop rather than blocking before it. This means foreground messages (SubscribeAll, IsNearHeadOfChainHeuristic, etc.), network events, and peer management are all responsive while the runtime is being downloaded and compiled. When bootstrap completes, AllSync is rebuilt with Aura consensus using the current finalized header (not the stale one from bootstrap start), and all tracked peers are re-added. Closes #3222 --- light-base/src/sync_service/parachain.rs | 260 ++++++++++++++++++----- 1 file changed, 206 insertions(+), 54 deletions(-) diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index 9a9dda8272..36d78f47a6 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -68,33 +68,33 @@ pub(super) async fn start_parachain( ) ); - // Phase 2: Download the parachain runtime from a P2P peer and determine Aura - // consensus parameters. Retries indefinitely until successful. - let (effective_chain_info, finalized_runtime) = loop { - match bootstrap_parachain_consensus( - &log_target, - &platform, - &network_service, - &effective_chain_info, - block_number_bytes, - ) - .await - { - Ok(b) => break (b.chain_info, b.finalized_runtime), - Err(err) => { - log!( - &platform, - Warn, - &log_target, - format!("Failed to bootstrap parachain consensus: {err}. Retrying in 5s...") - ); - platform.sleep(Duration::from_secs(5)).await; - } - } + // Phase 2 (concurrent): Build a future to bootstrap consensus in the background. + // This runs concurrently inside the main loop rather than blocking here. + let bootstrap_future: Option< + future::BoxFuture<'static, Result>, + > = { + let log_target = log_target.clone(); + let platform = platform.clone(); + let network_service = network_service.clone(); + let finalized_header_bytes = effective_chain_info + .as_ref() + .finalized_block_header + .scale_encoding_vec(block_number_bytes); + Some(Box::pin(async move { + bootstrap_parachain_consensus( + &log_target, + &platform, + &network_service, + &finalized_header_bytes, + block_number_bytes, + ) + .await + })) }; // Phase 3: Spawn the paraheads background service that tracks relay chain // finalization and reports finalized parachain blocks. + // This only needs the Phase 1 result (the finalized header), not Phase 2. let (to_paraheads, from_paraheads) = async_channel::bounded(16); let from_paraheads = Box::pin(from_paraheads); @@ -143,7 +143,8 @@ pub(super) async fn start_parachain( as future::BoxFuture<'static, super::SubscribeAll>) }; - // Phase 4: Create AllSync with Aura consensus from the bootstrapped chain information. + // Phase 4: Create AllSync with Unknown consensus from the Phase 1 chain information. + // Consensus will be upgraded to Aura once bootstrap_future completes. let mut task = Task { sync: Some(all::AllSync::new(all::Config { chain_information: effective_chain_info, @@ -168,7 +169,11 @@ pub(super) async fn start_parachain( paraheads_notifications: None, pending_parachain_finalization: None, network_up_to_date_best: true, - known_finalized_runtime: Some(finalized_runtime), + // Runtime is not yet known — will be set once bootstrap_future resolves. + known_finalized_runtime: None, + bootstrap_future, + bootstrap_retry_sleep: None, + block_number_bytes, pending_requests: stream::FuturesUnordered::new(), all_notifications: Vec::>::new(), log_target, @@ -203,6 +208,8 @@ pub(super) async fn start_parachain( ParaheadSubscribed(super::SubscribeAll), ParaheadNotification(super::Notification), ParaheadSubscriptionDead, + BootstrapComplete(Result), + BootstrapRetryReady, } let wake_up_reason = { @@ -257,6 +264,21 @@ pub(super) async fn start_parachain( future::pending().await } }) + .or(async { + if let Some(f) = task.bootstrap_future.as_mut() { + WakeUpReason::BootstrapComplete(f.await) + } else { + future::pending().await + } + }) + .or(async { + if let Some(sleep) = task.bootstrap_retry_sleep.as_mut() { + sleep.await; + WakeUpReason::BootstrapRetryReady + } else { + future::pending().await + } + }) .or({ let sync = &mut task.sync; async move { @@ -370,16 +392,20 @@ pub(super) async fn start_parachain( ?error ); - log!( - &task.platform, - Warn, - &task.log_target, - format!( - "Error while verifying header {}: {}", - HashDisplay(&verified_hash), - error - ) - ); + // Suppress noisy Warn logs during bootstrap — verification failures + // are expected when consensus is still Unknown. + if task.bootstrap_future.is_none() && task.bootstrap_retry_sleep.is_none() { + log!( + &task.platform, + Warn, + &task.log_target, + format!( + "Error while verifying header {}: {}", + HashDisplay(&verified_hash), + error + ) + ); + } } } } @@ -1028,6 +1054,129 @@ pub(super) async fn start_parachain( .. }) => {} + WakeUpReason::BootstrapComplete(Ok(bootstrapped)) => { + task.bootstrap_future = None; + + log!( + &task.platform, + Info, + &task.log_target, + "Parachain consensus bootstrapped — upgrading to Aura" + ); + + // Build chain information using the *current* finalized header + // (which may have advanced via paraheads since bootstrap started) + // combined with the Aura consensus parameters from the bootstrap. + let sync = task.sync.as_ref().unwrap(); + let current_finalized_header = + header::decode(sync.finalized_block_header(), task.block_number_bytes).unwrap(); + let chain_info = chain::chain_information::ChainInformation { + finalized_block_header: Box::new(current_finalized_header.into()), + consensus: chain::chain_information::ChainInformationConsensus::Aura { + finalized_authorities_list: bootstrapped.aura_authorities, + slot_duration: bootstrapped.aura_slot_duration, + }, + finality: chain::chain_information::ChainInformationFinality::Outsourced, + }; + let chain_info = + chain::chain_information::ValidChainInformation::try_from(chain_info) + .expect("current finalized header with Aura consensus must be valid"); + + // Collect existing peers and their best blocks, then abort all + // in-flight requests before rebuilding AllSync. + let sync = task.sync.as_mut().unwrap(); + let peer_info: Vec<_> = task + .peers_source_id_map + .drain() + .map(|(peer_id, source_id)| { + let (best_number, best_hash) = sync.source_best_block(source_id); + let role = sync[source_id].1; + (peer_id, role, best_number, *best_hash, source_id) + }) + .collect(); + for &(_, _, _, _, source_id) in &peer_info { + let (_, requests) = sync.remove_source(source_id); + for (_, abort) in requests { + abort.abort(); + } + } + let existing_peers: Vec<_> = peer_info + .into_iter() + .map(|(peer_id, role, best_number, best_hash, _)| { + (peer_id, role, best_number, best_hash) + }) + .collect(); + + // Drop the old AllSync. Any remaining pending_requests futures + // will resolve as Aborted and be harmlessly ignored. + task.sync = None; + + // Rebuild AllSync with Aura consensus using the current finalized header. + let mut new_sync = all::AllSync::new(all::Config { + chain_information: chain_info, + block_number_bytes: task.block_number_bytes, + allow_unknown_consensus_engines: true, + sources_capacity: 32, + blocks_capacity: 1024, + max_disjoint_headers: 1024, + max_requests_per_block: NonZero::::new(3).unwrap(), + download_ahead_blocks: NonZero::::new(5000).unwrap(), + download_bodies: false, + download_all_chain_information_storage_proofs: false, + code_trie_node_hint: None, + }); + + // Re-add all previously tracked peers. + for (peer_id, role, best_number, best_hash) in existing_peers { + let source_id = new_sync + .prepare_add_source(best_number, best_hash) + .add_source((peer_id.clone(), role), ()); + task.peers_source_id_map.insert(peer_id, source_id); + } + + task.sync = Some(new_sync); + task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); + task.network_up_to_date_best = false; + } + + WakeUpReason::BootstrapComplete(Err(err)) => { + log!( + &task.platform, + Warn, + &task.log_target, + format!("Failed to bootstrap parachain consensus: {err}. Retrying in 5s...") + ); + task.bootstrap_future = None; + let platform = task.platform.clone(); + task.bootstrap_retry_sleep = Some(Box::pin(async move { + platform.sleep(Duration::from_secs(5)).await; + })); + } + + WakeUpReason::BootstrapRetryReady => { + task.bootstrap_retry_sleep = None; + let log_target = task.log_target.clone(); + let platform = task.platform.clone(); + let network_service = task.network_service.clone(); + let finalized_header_bytes = task + .sync + .as_ref() + .unwrap_or_else(|| unreachable!()) + .finalized_block_header() + .to_vec(); + let block_number_bytes = task.block_number_bytes; + task.bootstrap_future = Some(Box::pin(async move { + bootstrap_parachain_consensus( + &log_target, + &platform, + &network_service, + &finalized_header_bytes, + block_number_bytes, + ) + .await + })); + } + // Unreachable variants - parachains don't use warp sync, finality proofs, or Grandpa WakeUpReason::NetworkEvent( network_service::Event::GrandpaNeighborPacket { .. } @@ -1058,6 +1207,18 @@ struct Task { /// If `Some`, contains the runtime of the current finalized block. known_finalized_runtime: Option, + /// Future resolving to the Phase 2 bootstrap result. `None` once bootstrap + /// is complete and consensus has been upgraded to Aura. + bootstrap_future: Option>>, + + /// When bootstrap fails, a retry sleep future. When it resolves, a new + /// `bootstrap_future` is created. + bootstrap_retry_sleep: Option>, + + /// Number of bytes used to encode block numbers in headers. + /// Stored so it is available when rebuilding AllSync after bootstrap completes. + block_number_bytes: usize, + /// For each networking peer, the index of the corresponding peer within the sync. peers_source_id_map: HashMap, @@ -1228,7 +1389,8 @@ async fn fetch_parachain_head_from_relay( } struct BootstrappedParachain { - chain_info: chain::chain_information::ValidChainInformation, + aura_authorities: Vec, + aura_slot_duration: NonZero, finalized_runtime: FinalizedBlockRuntime, } @@ -1237,12 +1399,13 @@ async fn bootstrap_parachain_consensus( log_target: &str, platform: &TPlat, network_service: &Arc>, - chain_info: &chain::chain_information::ValidChainInformation, + finalized_header_bytes: &[u8], block_number_bytes: usize, ) -> Result { - let ci_ref = chain_info.as_ref(); - let state_root = *ci_ref.finalized_block_header.state_root; - let block_hash = ci_ref.finalized_block_header.hash(block_number_bytes); + let decoded_header = header::decode(finalized_header_bytes, block_number_bytes) + .map_err(|e| format!("Failed to decode finalized header: {e}"))?; + let state_root = *decoded_header.state_root; + let block_hash = header::hash_from_scale_encoded_header(finalized_header_bytes); log!( platform, @@ -1250,7 +1413,7 @@ async fn bootstrap_parachain_consensus( log_target, format!( "Bootstrapping parachain consensus from block #{} ({})", - ci_ref.finalized_block_header.number, + decoded_header.number, HashDisplay(&block_hash) ) ); @@ -1413,20 +1576,9 @@ async fn bootstrap_parachain_consensus( ) ); - let new_chain_info = chain::chain_information::ChainInformation { - finalized_block_header: Box::new(ci_ref.finalized_block_header.into()), - consensus: chain::chain_information::ChainInformationConsensus::Aura { - finalized_authorities_list: authorities, - slot_duration, - }, - finality: chain::chain_information::ChainInformationFinality::Outsourced, - }; - - let chain_info = chain::chain_information::ValidChainInformation::try_from(new_chain_info) - .map_err(|e| format!("Invalid chain information: {e}"))?; - Ok(BootstrappedParachain { - chain_info, + aura_authorities: authorities, + aura_slot_duration: slot_duration, finalized_runtime: FinalizedBlockRuntime { virtual_machine: vm, storage_code: Some(code), From 0d0094c603d61f39217fb47c4843a4dd4892633e Mon Sep 17 00:00:00 2001 From: w Date: Wed, 22 Apr 2026 20:08:00 -0400 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20in-pl?= =?UTF-8?q?ace=20consensus=20update,=20stale=20authority=20guard,=20peer?= =?UTF-8?q?=20passing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Add AllSync::set_finalized_consensus() (through NonFinalizedTree → AllForksSync → AllSync) to update consensus in-place. No more AllSync rebuild/peer drain/request abort dance. 2. Guard against stale Aura authorities: if finalization advanced past the bootstrap block, discard the result and re-bootstrap from the current finalized header. 3. Remove redundant network subscription from bootstrap_parachain_consensus. The main loop passes a peer via oneshot channel instead. --- lib/src/chain/blocks_tree.rs | 33 ++++ lib/src/sync/all.rs | 14 ++ lib/src/sync/all_forks.rs | 10 ++ light-base/src/sync_service/parachain.rs | 187 ++++++++++------------- 4 files changed, 139 insertions(+), 105 deletions(-) diff --git a/lib/src/chain/blocks_tree.rs b/lib/src/chain/blocks_tree.rs index 62d5366fd9..35682bbd0e 100644 --- a/lib/src/chain/blocks_tree.rs +++ b/lib/src/chain/blocks_tree.rs @@ -234,6 +234,39 @@ impl NonFinalizedTree { self.blocks_trigger_gp_change.clear(); } + /// Updates the consensus algorithm of the finalized block. + /// + /// This is useful when the chain starts with `Unknown` consensus and later + /// discovers the actual consensus parameters (e.g. Aura authorities). + /// + /// Non-finalized blocks that were verified under the old consensus are cleared, + /// as they may have been verified with incorrect rules. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + self.finalized_consensus = match consensus { + chain_information::ChainInformationConsensus::Unknown => FinalizedConsensus::Unknown, + chain_information::ChainInformationConsensus::Aura { + finalized_authorities_list, + slot_duration, + } => FinalizedConsensus::Aura { + authorities_list: Arc::new(finalized_authorities_list), + slot_duration, + }, + chain_information::ChainInformationConsensus::Babe { + finalized_block_epoch_information, + finalized_next_epoch_transition, + slots_per_epoch, + } => FinalizedConsensus::Babe { + slots_per_epoch, + block_epoch_information: finalized_block_epoch_information.map(Arc::from), + next_epoch_transition: Arc::from(finalized_next_epoch_transition), + }, + }; + self.clear(); + } + /// Returns true if there isn't any non-finalized block in the chain. pub fn is_empty(&self) -> bool { self.blocks.is_empty() diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index eb9a128feb..e0a32821c0 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -243,6 +243,20 @@ impl AllSync { all_forks.as_chain_information() } + /// Updates the consensus algorithm of the finalized block in-place. + /// + /// Clears any non-finalized blocks, as they may have been verified with + /// incorrect consensus rules. Peers and requests are unaffected. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + let Some(all_forks) = &mut self.all_forks else { + unreachable!() + }; + all_forks.set_finalized_consensus(consensus); + } + /// Returns the current status of the syncing. pub fn status(&'_ self) -> Status<'_, TSrc> { // TODO: diff --git a/lib/src/sync/all_forks.rs b/lib/src/sync/all_forks.rs index 7629a880bf..d7166d0d8a 100644 --- a/lib/src/sync/all_forks.rs +++ b/lib/src/sync/all_forks.rs @@ -445,6 +445,16 @@ impl AllForksSync { self.chain.as_chain_information() } + /// Updates the consensus algorithm of the finalized block. + /// + /// See [`blocks_tree::NonFinalizedTree::set_finalized_consensus`] for details. + pub fn set_finalized_consensus( + &mut self, + consensus: chain_information::ChainInformationConsensus, + ) { + self.chain.set_finalized_consensus(consensus); + } + /// Returns the header of the finalized block. pub fn finalized_block_header(&self) -> &[u8] { self.chain.finalized_block_header() diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index 36d78f47a6..115d19ac5e 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -69,7 +69,9 @@ pub(super) async fn start_parachain( ); // Phase 2 (concurrent): Build a future to bootstrap consensus in the background. - // This runs concurrently inside the main loop rather than blocking here. + // The bootstrap needs a connected peer, which the main loop provides via a + // oneshot channel when the first peer connects. + let (bootstrap_peer_tx, bootstrap_peer_rx) = oneshot::channel(); let bootstrap_future: Option< future::BoxFuture<'static, Result>, > = { @@ -81,10 +83,14 @@ pub(super) async fn start_parachain( .finalized_block_header .scale_encoding_vec(block_number_bytes); Some(Box::pin(async move { + let peer_id = bootstrap_peer_rx + .await + .map_err(|_| String::from("Peer channel closed"))?; bootstrap_parachain_consensus( &log_target, &platform, &network_service, + peer_id, &finalized_header_bytes, block_number_bytes, ) @@ -172,6 +178,7 @@ pub(super) async fn start_parachain( // Runtime is not yet known — will be set once bootstrap_future resolves. known_finalized_runtime: None, bootstrap_future, + bootstrap_peer_tx: Some(bootstrap_peer_tx), bootstrap_retry_sleep: None, block_number_bytes, pending_requests: stream::FuturesUnordered::new(), @@ -416,6 +423,10 @@ pub(super) async fn start_parachain( best_block_number, best_block_hash, }) => { + // If bootstrap is waiting for a peer, send this one. + if let Some(tx) = task.bootstrap_peer_tx.take() { + let _ = tx.send(peer_id.clone()); + } task.peers_source_id_map.insert( peer_id.clone(), task.sync @@ -1057,86 +1068,60 @@ pub(super) async fn start_parachain( WakeUpReason::BootstrapComplete(Ok(bootstrapped)) => { task.bootstrap_future = None; - log!( - &task.platform, - Info, - &task.log_target, - "Parachain consensus bootstrapped — upgrading to Aura" - ); - - // Build chain information using the *current* finalized header - // (which may have advanced via paraheads since bootstrap started) - // combined with the Aura consensus parameters from the bootstrap. - let sync = task.sync.as_ref().unwrap(); - let current_finalized_header = - header::decode(sync.finalized_block_header(), task.block_number_bytes).unwrap(); - let chain_info = chain::chain_information::ChainInformation { - finalized_block_header: Box::new(current_finalized_header.into()), - consensus: chain::chain_information::ChainInformationConsensus::Aura { - finalized_authorities_list: bootstrapped.aura_authorities, - slot_duration: bootstrapped.aura_slot_duration, - }, - finality: chain::chain_information::ChainInformationFinality::Outsourced, - }; - let chain_info = - chain::chain_information::ValidChainInformation::try_from(chain_info) - .expect("current finalized header with Aura consensus must be valid"); + let sync = task.sync.as_mut().unwrap_or_else(|| unreachable!()); - // Collect existing peers and their best blocks, then abort all - // in-flight requests before rebuilding AllSync. - let sync = task.sync.as_mut().unwrap(); - let peer_info: Vec<_> = task - .peers_source_id_map - .drain() - .map(|(peer_id, source_id)| { - let (best_number, best_hash) = sync.source_best_block(source_id); - let role = sync[source_id].1; - (peer_id, role, best_number, *best_hash, source_id) - }) - .collect(); - for &(_, _, _, _, source_id) in &peer_info { - let (_, requests) = sync.remove_source(source_id); - for (_, abort) in requests { - abort.abort(); + // If finalization advanced past the block we bootstrapped from, + // the Aura authorities may have changed. Re-bootstrap. + if *sync.finalized_block_hash() != bootstrapped.bootstrap_block_hash { + log!( + &task.platform, + Info, + &task.log_target, + "Finalized block advanced during bootstrap — re-bootstrapping" + ); + let log_target = task.log_target.clone(); + let platform = task.platform.clone(); + let network_service = task.network_service.clone(); + let finalized_header_bytes = sync.finalized_block_header().to_vec(); + let block_number_bytes = task.block_number_bytes; + let (tx, rx) = oneshot::channel(); + task.bootstrap_peer_tx = Some(tx); + // Feed an existing peer if we have one, otherwise the + // Connected handler will send one later. + if let Some(peer_id) = task.peers_source_id_map.keys().next().cloned() { + let _ = task.bootstrap_peer_tx.take().unwrap().send(peer_id); } - } - let existing_peers: Vec<_> = peer_info - .into_iter() - .map(|(peer_id, role, best_number, best_hash, _)| { - (peer_id, role, best_number, best_hash) - }) - .collect(); - - // Drop the old AllSync. Any remaining pending_requests futures - // will resolve as Aborted and be harmlessly ignored. - task.sync = None; - - // Rebuild AllSync with Aura consensus using the current finalized header. - let mut new_sync = all::AllSync::new(all::Config { - chain_information: chain_info, - block_number_bytes: task.block_number_bytes, - allow_unknown_consensus_engines: true, - sources_capacity: 32, - blocks_capacity: 1024, - max_disjoint_headers: 1024, - max_requests_per_block: NonZero::::new(3).unwrap(), - download_ahead_blocks: NonZero::::new(5000).unwrap(), - download_bodies: false, - download_all_chain_information_storage_proofs: false, - code_trie_node_hint: None, - }); + task.bootstrap_future = Some(Box::pin(async move { + let peer_id = rx + .await + .map_err(|_| String::from("Peer channel closed"))?; + bootstrap_parachain_consensus( + &log_target, + &platform, + &network_service, + peer_id, + &finalized_header_bytes, + block_number_bytes, + ) + .await + })); + } else { + log!( + &task.platform, + Info, + &task.log_target, + "Parachain consensus bootstrapped — upgrading to Aura" + ); - // Re-add all previously tracked peers. - for (peer_id, role, best_number, best_hash) in existing_peers { - let source_id = new_sync - .prepare_add_source(best_number, best_hash) - .add_source((peer_id.clone(), role), ()); - task.peers_source_id_map.insert(peer_id, source_id); + sync.set_finalized_consensus( + chain::chain_information::ChainInformationConsensus::Aura { + finalized_authorities_list: bootstrapped.aura_authorities, + slot_duration: bootstrapped.aura_slot_duration, + }, + ); + task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); + task.network_up_to_date_best = false; } - - task.sync = Some(new_sync); - task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); - task.network_up_to_date_best = false; } WakeUpReason::BootstrapComplete(Err(err)) => { @@ -1165,11 +1150,20 @@ pub(super) async fn start_parachain( .finalized_block_header() .to_vec(); let block_number_bytes = task.block_number_bytes; + let (tx, rx) = oneshot::channel(); + task.bootstrap_peer_tx = Some(tx); + if let Some(peer_id) = task.peers_source_id_map.keys().next().cloned() { + let _ = task.bootstrap_peer_tx.take().unwrap().send(peer_id); + } task.bootstrap_future = Some(Box::pin(async move { + let peer_id = rx + .await + .map_err(|_| String::from("Peer channel closed"))?; bootstrap_parachain_consensus( &log_target, &platform, &network_service, + peer_id, &finalized_header_bytes, block_number_bytes, ) @@ -1211,12 +1205,15 @@ struct Task { /// is complete and consensus has been upgraded to Aura. bootstrap_future: Option>>, + /// Oneshot sender to provide a peer ID to the bootstrap future. + /// `None` once a peer has been sent or bootstrap is complete. + bootstrap_peer_tx: Option>, + /// When bootstrap fails, a retry sleep future. When it resolves, a new /// `bootstrap_future` is created. bootstrap_retry_sleep: Option>, /// Number of bytes used to encode block numbers in headers. - /// Stored so it is available when rebuilding AllSync after bootstrap completes. block_number_bytes: usize, /// For each networking peer, the index of the corresponding peer within the sync. @@ -1389,16 +1386,21 @@ async fn fetch_parachain_head_from_relay( } struct BootstrappedParachain { + /// Hash of the block that was used to bootstrap consensus. + bootstrap_block_hash: [u8; 32], aura_authorities: Vec, aura_slot_duration: NonZero, finalized_runtime: FinalizedBlockRuntime, } /// Downloads the parachain runtime from a P2P peer and determines Aura consensus parameters. +/// +/// The caller provides a connected `peer_id` to use for network requests. async fn bootstrap_parachain_consensus( log_target: &str, platform: &TPlat, network_service: &Arc>, + peer_id: libp2p::PeerId, finalized_header_bytes: &[u8], block_number_bytes: usize, ) -> Result { @@ -1412,38 +1414,12 @@ async fn bootstrap_parachain_consensus( Info, log_target, format!( - "Bootstrapping parachain consensus from block #{} ({})", + "Bootstrapping parachain consensus from block #{} ({}) using peer {peer_id}", decoded_header.number, HashDisplay(&block_hash) ) ); - // Wait for a peer to connect. - let peer_id = { - let mut from_network = Box::pin(network_service.subscribe().await); - - if let Some(peer) = network_service.peers_list().await.next() { - peer - } else { - loop { - match from_network.next().await { - Some(network_service::Event::Connected { peer_id, .. }) => break peer_id, - Some(_) => continue, - None => { - from_network = Box::pin(network_service.subscribe().await); - } - } - } - } - }; - - log!( - platform, - Info, - log_target, - format!("Downloading parachain runtime from peer {peer_id}") - ); - // Download :code and :heappages. let proof = network_service .clone() @@ -1577,6 +1553,7 @@ async fn bootstrap_parachain_consensus( ); Ok(BootstrappedParachain { + bootstrap_block_hash: block_hash, aura_authorities: authorities, aura_slot_duration: slot_duration, finalized_runtime: FinalizedBlockRuntime { From 2bb4b8354d4235cbdb3b1c371274f96f3f1e5584 Mon Sep 17 00:00:00 2001 From: w Date: Wed, 22 Apr 2026 23:49:21 -0400 Subject: [PATCH 3/3] refactor: extract start_bootstrap helper, drop re-bootstrap-on-drift Extract the bootstrap future construction (oneshot channel + peer feeding + bootstrap_parachain_consensus call) into Task::start_bootstrap(). Three identical copies collapse to one. Remove the re-bootstrap-on-drift guard. Aura authority sets change at session boundaries (hours apart), not every few blocks. If the finalized block advances a few blocks during bootstrap the authorities are still valid. If they're wrong, verify_header catches it immediately. --- light-base/src/sync_service/parachain.rs | 161 +++++++---------------- 1 file changed, 48 insertions(+), 113 deletions(-) diff --git a/light-base/src/sync_service/parachain.rs b/light-base/src/sync_service/parachain.rs index 115d19ac5e..553fdda5b6 100644 --- a/light-base/src/sync_service/parachain.rs +++ b/light-base/src/sync_service/parachain.rs @@ -68,36 +68,6 @@ pub(super) async fn start_parachain( ) ); - // Phase 2 (concurrent): Build a future to bootstrap consensus in the background. - // The bootstrap needs a connected peer, which the main loop provides via a - // oneshot channel when the first peer connects. - let (bootstrap_peer_tx, bootstrap_peer_rx) = oneshot::channel(); - let bootstrap_future: Option< - future::BoxFuture<'static, Result>, - > = { - let log_target = log_target.clone(); - let platform = platform.clone(); - let network_service = network_service.clone(); - let finalized_header_bytes = effective_chain_info - .as_ref() - .finalized_block_header - .scale_encoding_vec(block_number_bytes); - Some(Box::pin(async move { - let peer_id = bootstrap_peer_rx - .await - .map_err(|_| String::from("Peer channel closed"))?; - bootstrap_parachain_consensus( - &log_target, - &platform, - &network_service, - peer_id, - &finalized_header_bytes, - block_number_bytes, - ) - .await - })) - }; - // Phase 3: Spawn the paraheads background service that tracks relay chain // finalization and reports finalized parachain blocks. // This only needs the Phase 1 result (the finalized header), not Phase 2. @@ -175,10 +145,9 @@ pub(super) async fn start_parachain( paraheads_notifications: None, pending_parachain_finalization: None, network_up_to_date_best: true, - // Runtime is not yet known — will be set once bootstrap_future resolves. known_finalized_runtime: None, - bootstrap_future, - bootstrap_peer_tx: Some(bootstrap_peer_tx), + bootstrap_future: None, + bootstrap_peer_tx: None, bootstrap_retry_sleep: None, block_number_bytes, pending_requests: stream::FuturesUnordered::new(), @@ -197,6 +166,9 @@ pub(super) async fn start_parachain( platform, }; + // Phase 2 (concurrent): Start bootstrap in the background. + task.start_bootstrap(); + // Phase 5: Main sync loop. loop { // Yield at every loop in order to provide better tasks granularity. @@ -1068,60 +1040,24 @@ pub(super) async fn start_parachain( WakeUpReason::BootstrapComplete(Ok(bootstrapped)) => { task.bootstrap_future = None; - let sync = task.sync.as_mut().unwrap_or_else(|| unreachable!()); - - // If finalization advanced past the block we bootstrapped from, - // the Aura authorities may have changed. Re-bootstrap. - if *sync.finalized_block_hash() != bootstrapped.bootstrap_block_hash { - log!( - &task.platform, - Info, - &task.log_target, - "Finalized block advanced during bootstrap — re-bootstrapping" - ); - let log_target = task.log_target.clone(); - let platform = task.platform.clone(); - let network_service = task.network_service.clone(); - let finalized_header_bytes = sync.finalized_block_header().to_vec(); - let block_number_bytes = task.block_number_bytes; - let (tx, rx) = oneshot::channel(); - task.bootstrap_peer_tx = Some(tx); - // Feed an existing peer if we have one, otherwise the - // Connected handler will send one later. - if let Some(peer_id) = task.peers_source_id_map.keys().next().cloned() { - let _ = task.bootstrap_peer_tx.take().unwrap().send(peer_id); - } - task.bootstrap_future = Some(Box::pin(async move { - let peer_id = rx - .await - .map_err(|_| String::from("Peer channel closed"))?; - bootstrap_parachain_consensus( - &log_target, - &platform, - &network_service, - peer_id, - &finalized_header_bytes, - block_number_bytes, - ) - .await - })); - } else { - log!( - &task.platform, - Info, - &task.log_target, - "Parachain consensus bootstrapped — upgrading to Aura" - ); + log!( + &task.platform, + Info, + &task.log_target, + "Parachain consensus bootstrapped — upgrading to Aura" + ); - sync.set_finalized_consensus( + task.sync + .as_mut() + .unwrap_or_else(|| unreachable!()) + .set_finalized_consensus( chain::chain_information::ChainInformationConsensus::Aura { finalized_authorities_list: bootstrapped.aura_authorities, slot_duration: bootstrapped.aura_slot_duration, }, ); - task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); - task.network_up_to_date_best = false; - } + task.known_finalized_runtime = Some(bootstrapped.finalized_runtime); + task.network_up_to_date_best = false; } WakeUpReason::BootstrapComplete(Err(err)) => { @@ -1140,35 +1076,7 @@ pub(super) async fn start_parachain( WakeUpReason::BootstrapRetryReady => { task.bootstrap_retry_sleep = None; - let log_target = task.log_target.clone(); - let platform = task.platform.clone(); - let network_service = task.network_service.clone(); - let finalized_header_bytes = task - .sync - .as_ref() - .unwrap_or_else(|| unreachable!()) - .finalized_block_header() - .to_vec(); - let block_number_bytes = task.block_number_bytes; - let (tx, rx) = oneshot::channel(); - task.bootstrap_peer_tx = Some(tx); - if let Some(peer_id) = task.peers_source_id_map.keys().next().cloned() { - let _ = task.bootstrap_peer_tx.take().unwrap().send(peer_id); - } - task.bootstrap_future = Some(Box::pin(async move { - let peer_id = rx - .await - .map_err(|_| String::from("Peer channel closed"))?; - bootstrap_parachain_consensus( - &log_target, - &platform, - &network_service, - peer_id, - &finalized_header_bytes, - block_number_bytes, - ) - .await - })); + task.start_bootstrap(); } // Unreachable variants - parachains don't use warp sync, finality proofs, or Grandpa @@ -1248,6 +1156,36 @@ enum RequestOutcome { } impl Task { + fn start_bootstrap(&mut self) { + let log_target = self.log_target.clone(); + let platform = self.platform.clone(); + let network_service = self.network_service.clone(); + let finalized_header_bytes = self + .sync + .as_ref() + .unwrap_or_else(|| unreachable!()) + .finalized_block_header() + .to_vec(); + let block_number_bytes = self.block_number_bytes; + let (tx, rx) = oneshot::channel(); + self.bootstrap_peer_tx = Some(tx); + if let Some(peer_id) = self.peers_source_id_map.keys().next().cloned() { + let _ = self.bootstrap_peer_tx.take().unwrap().send(peer_id); + } + self.bootstrap_future = Some(Box::pin(async move { + let peer_id = rx.await.map_err(|_| String::from("Peer channel closed"))?; + bootstrap_parachain_consensus( + &log_target, + &platform, + &network_service, + peer_id, + &finalized_header_bytes, + block_number_bytes, + ) + .await + })); + } + fn dispatch_all_subscribers(&mut self, notification: Notification) { for index in (0..self.all_notifications.len()).rev() { let subscription = self.all_notifications.swap_remove(index); @@ -1386,8 +1324,6 @@ async fn fetch_parachain_head_from_relay( } struct BootstrappedParachain { - /// Hash of the block that was used to bootstrap consensus. - bootstrap_block_hash: [u8; 32], aura_authorities: Vec, aura_slot_duration: NonZero, finalized_runtime: FinalizedBlockRuntime, @@ -1553,7 +1489,6 @@ async fn bootstrap_parachain_consensus( ); Ok(BootstrappedParachain { - bootstrap_block_hash: block_hash, aura_authorities: authorities, aura_slot_duration: slot_duration, finalized_runtime: FinalizedBlockRuntime {