Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 86 additions & 26 deletions light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,17 @@ impl<TPlat: PlatformRef> Task<TPlat> {
}
}

// Fetch the included parachain head from a finalized relay chain block.
// Fetch the included parachain head from a relay chain block.
//
// Bootstraps optimistically against the relay's current best block (or
// finalized fallback) instead of waiting for a NEW Notification::Finalized.
// Trades a strict-finality guarantee on the parahead pointer for a
// significantly lower cold-start latency: the previous "wait for next
// finalized notification" behavior blocked here for as long as it took a
// peer to gossip a GRANDPA commit message after warp-sync — empirically
// 0.6–67 s on Paseo. Subsequent finalized notifications still flow through
// the runtime_service and reorg the parachain if the chosen relay block
// was wrong.
async fn fetch_parachain_head_from_relay<TPlat: PlatformRef>(
log_target: &str,
platform: &TPlat,
Expand All @@ -1117,54 +1127,58 @@ async fn fetch_parachain_head_from_relay<TPlat: PlatformRef>(
.subscribe_all(32, NonZero::<usize>::new(usize::MAX).unwrap())
.await;

// Pick the relay's current best block from the initial subscription
// state. `non_finalized_blocks_ancestry_order` has exactly one entry
// with `is_new_best == true` whenever any non-finalized blocks are
// tracked. Otherwise the finalized block IS the current best —
// typically the warp-sync target on cold start.
let initial_hash = subscription
.non_finalized_blocks_ancestry_order
.iter()
.find(|b| b.is_new_best)
.map(|b| header::hash_from_scale_encoded_header(&b.scale_encoded_header))
.unwrap_or_else(|| {
header::hash_from_scale_encoded_header(
&subscription.finalized_block_scale_encoded_header,
)
});

log!(
platform,
Info,
log_target,
"Waiting for relay chain to finalize a block..."
"Bootstrapping parachain from current relay tip"
);

let mut next_hash = initial_hash;
loop {
let finalized_hash = loop {
match subscription.new_blocks.next().await {
Some(runtime_service::Notification::Finalized { hash, .. }) => {
break hash;
}
Some(_) => continue,
None => {
// Subscription died. Re-subscribe.
subscription = relay_chain_sync
.subscribe_all(32, NonZero::<usize>::new(usize::MAX).unwrap())
.await;
break header::hash_from_scale_encoded_header(
&subscription.finalized_block_scale_encoded_header,
);
}
}
};
let block_hash = next_hash;

log!(
platform,
Debug,
log_target,
format!(
"Trying to fetch parachain head from relay block {}",
HashDisplay(&finalized_hash)
HashDisplay(&block_hash)
)
);

let pinned = relay_chain_sync
.pin_pinned_block_runtime(subscription.new_blocks.id(), finalized_hash)
.pin_pinned_block_runtime(subscription.new_blocks.id(), block_hash)
.await;
let (pinned_runtime, block_state_trie_root, block_number) = match pinned {
Ok(v) => v,
Err(_) => continue,
Err(_) => {
next_hash = next_relay_block(&mut subscription, relay_chain_sync).await;
continue;
}
};

let call_result = relay_chain_sync
.runtime_call(
pinned_runtime,
finalized_hash,
block_hash,
block_number,
block_state_trie_root,
String::from(para::PERSISTED_VALIDATION_FUNCTION_NAME),
Expand All @@ -1184,23 +1198,32 @@ async fn fetch_parachain_head_from_relay<TPlat: PlatformRef>(
.await;
let success = match call_result {
Ok(s) => s,
Err(_) => continue,
Err(_) => {
next_hash = next_relay_block(&mut subscription, relay_chain_sync).await;
continue;
}
};

let pvd = match para::decode_persisted_validation_data_return_value(
&success.output,
relay_chain_sync.block_number_bytes(),
) {
Ok(Some(pvd)) => pvd,
_ => continue,
_ => {
next_hash = next_relay_block(&mut subscription, relay_chain_sync).await;
continue;
}
};

let parachain_header_bytes = pvd.parent_head.to_vec();
// `parent_head` is documented as opaque data, but for chains built on Cumulus (the vast
// majority) it is a SCALE-encoded block header.
let decoded_header = match header::decode(&parachain_header_bytes, block_number_bytes) {
Ok(h) => h,
Err(_) => continue,
Err(_) => {
next_hash = next_relay_block(&mut subscription, relay_chain_sync).await;
continue;
}
};

log!(
Expand Down Expand Up @@ -1232,6 +1255,43 @@ struct BootstrappedParachain {
finalized_runtime: FinalizedBlockRuntime,
}

// Wait for the next usable relay block from the subscription. Accepts
// best-block changes (`BestBlockChanged`, `Block` with `is_new_best`) in
// addition to finalized notifications, so retries on optimistic-bootstrap
// failure don't sit on `subscription.new_blocks.next()` waiting for the
// next GRANDPA commit. On subscription death, re-subscribes and returns
// the new subscription's current best (or finalized) block hash.
async fn next_relay_block<TPlat: PlatformRef>(
subscription: &mut runtime_service::SubscribeAll<TPlat>,
relay_chain_sync: &Arc<runtime_service::RuntimeService<TPlat>>,
) -> [u8; 32] {
loop {
match subscription.new_blocks.next().await {
Some(runtime_service::Notification::Finalized { hash, .. }) => return hash,
Some(runtime_service::Notification::BestBlockChanged { hash }) => return hash,
Some(runtime_service::Notification::Block(b)) if b.is_new_best => {
return header::hash_from_scale_encoded_header(&b.scale_encoded_header);
}
Some(_) => continue,
None => {
*subscription = relay_chain_sync
.subscribe_all(32, NonZero::<usize>::new(usize::MAX).unwrap())
.await;
return subscription
.non_finalized_blocks_ancestry_order
.iter()
.find(|b| b.is_new_best)
.map(|b| header::hash_from_scale_encoded_header(&b.scale_encoded_header))
.unwrap_or_else(|| {
header::hash_from_scale_encoded_header(
&subscription.finalized_block_scale_encoded_header,
)
});
}
}
}
}

/// Downloads the parachain runtime from a P2P peer and determines Aura consensus parameters.
async fn bootstrap_parachain_consensus<TPlat: PlatformRef>(
log_target: &str,
Expand Down
Loading