diff --git a/crates/commonware-node/src/consensus/engine.rs b/crates/commonware-node/src/consensus/engine.rs index d3f4fbdf05..00d7fd3c8e 100644 --- a/crates/commonware-node/src/consensus/engine.rs +++ b/crates/commonware-node/src/consensus/engine.rs @@ -281,7 +281,6 @@ where context.with_label("executor"), crate::executor::Config { execution_node: execution_node.clone(), - epoch_strategy: epoch_strategy.clone(), last_finalized_height, marshal: marshal_mailbox.clone(), fcu_heartbeat_interval: self.fcu_heartbeat_interval, diff --git a/crates/commonware-node/src/executor/actor.rs b/crates/commonware-node/src/executor/actor.rs index d4ae42bf98..948ce7bbc4 100644 --- a/crates/commonware-node/src/executor/actor.rs +++ b/crates/commonware-node/src/executor/actor.rs @@ -16,7 +16,7 @@ //! certificate is received, a stream walking the ancestors of the proposal //! is kickstarted to backfill all blocks to the execution layer. use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, pin::Pin, sync::Arc, task::{Poll, ready}, @@ -27,8 +27,8 @@ use alloy_rpc_types_engine::{ ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum, }; use commonware_consensus::{ - CertifiableBlock as _, Epochable, Heightable as _, - types::{Epocher as _, FixedEpocher, Height, Round, View}, + Heightable as _, + types::{Height, Round}, }; use commonware_runtime::{Clock, ContextCell, Handle, Metrics, Pacer, Spawner, spawn_cell}; @@ -46,7 +46,6 @@ use pin_project::pin_project; use reth_ethereum::{chainspec::EthChainSpec, rpc::eth::primitives::BlockNumHash}; use reth_node_builder::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError}; use reth_provider::BlockIdReader as _; -use tempo_chainspec::hardfork::TempoHardforks as _; use tempo_node::{TempoExecutionData, TempoFullNode}; use tempo_payload_types::TempoPayloadAttributes; use tokio::select; @@ -54,7 +53,6 @@ use tracing::{debug, error, info, instrument, warn}; use super::{Config, ingress::MessageWithSpan}; use crate::{ - alias::simplex::Notarization, consensus::{Digest, block::Block}, executor::ingress::{CanonicalizeAndBuild, FinalizedBlock, FinalizedTip}, utils::OptionFuture, @@ -67,9 +65,6 @@ pub(crate) struct Actor { /// and to update the canonical chain by sending forkchoice updates. execution_node: Arc, - /// The epoch strategy used throughout the node. - epoch_strategy: FixedEpocher, - /// The last finalized height as reported by the marshal actor. Important /// when reconciling Consensus Layer and Execution Layer state at startup. last_marshal_finalized_height: Height, @@ -102,31 +97,23 @@ pub(crate) struct Actor { /// forwards block sequentially and without gaps. pending_finalizations: VecDeque, - /// Pending certifications are notarizations that have been been locally - /// executed and verified. - pending_certifications: VecDeque, - - /// A map of notarizations and the round they were received in. - observed_notarizations: BTreeMap, - /// A state submission that is currently in flight to the execution layer. pending_state_submission: OptionFuture>, - /// An in-flight fetch of a notarized block from the marshal actor. - // TODO: Can this be parallelized in any fashion? If we have multiple - // notarization certificates, we should be able to just schedule several - // concurrent walks. - pending_certified_block_fetch: OptionFuture, - - /// A pending request to canonicalize a (notarized/head) block and build - /// a block for a proposal. - pending_build_request: Option, - - // A cache of blocks that is continuously populated from notarizations - // coming in. Note that this contains stricly certified blocks only. These - // are blocks that have been notarized and certified. As notarizations come - // in, their parents are assumed notarized and certified. - cache_of_certified_blocks: BTreeMap, + // A cache of blocks that are certified + cache_of_certified_blocks: CertifiedBlockCache, + + /// In-flight proposal build. + build: Option, + + /// Highest known sync target — either a locally-certified block (safe + /// to HEAD-advance to) or a notarization target (HEAD-advances only + /// to its parent, which the protocol guarantees is locally + /// certified). + head_sync: Option, + + /// In-flight sync-pipeline fetch . + pending_sync_fetch: OptionFuture, } impl Actor @@ -141,7 +128,6 @@ where ) -> eyre::Result { let Config { execution_node, - epoch_strategy, last_finalized_height, marshal, fcu_heartbeat_interval, @@ -165,7 +151,6 @@ where Ok(Self { context: ContextCell::new(context), execution_node, - epoch_strategy, last_marshal_finalized_height: last_finalized_height, mailbox, marshal, @@ -179,13 +164,13 @@ where finalized_tip: None, pending_finalizations: VecDeque::new(), - pending_certifications: VecDeque::new(), - observed_notarizations: BTreeMap::new(), - pending_build_request: None, pending_state_submission: OptionFuture::none(), - pending_certified_block_fetch: OptionFuture::none(), - cache_of_certified_blocks: BTreeMap::new(), + cache_of_certified_blocks: CertifiedBlockCache::default(), + + build: None, + head_sync: None, + pending_sync_fetch: OptionFuture::none(), }) } @@ -204,17 +189,19 @@ where } } - rsp = &mut self.pending_certified_block_fetch => { - self.continue_notarization_ancestry_walk(rsp).await; + rsp = &mut self.pending_sync_fetch => { + if let Err(error) = self.handle_sync_fetch_response(rsp) { + warn!(%error, "failed handle fetching block in the sync pipeline"); + } } msg = self.mailbox.next() => { - let Some(msg) = msg else { break Err(eyre::Report::msg( - "actor mailbox closed unexpectedly" - )); }; + let Some(msg) = msg else { + break Err(eyre::Report::msg("actor mailbox closed unexpectedly")); + }; if let Err(error) = self.handle_message(msg).await { break Err(error).wrap_err( - "executor encountered fatal fork choice update error; \ + "executor encountered fatal error; \ shutting down to prevent consensus-execution divergence" ); } @@ -223,15 +210,13 @@ where () = &mut self.fcu_heartbeat_timer => {}, } - self.prune_certifications(); if let Err(err) = self - .submit_next_state() + .drive_sync() .await - .wrap_err("failed to submit next state") + .wrap_err("dispatcher failed to drive sync") { break Err(err); } - self.kick_off_notarization_ancestry_walk().await; }; match reason { @@ -243,25 +228,58 @@ where async fn handle_message(&mut self, message: MessageWithSpan) -> eyre::Result<()> { let _cause = message.cause; match message.inner { - super::ingress::Message::CanonicalizeAndBuild(canonicalize_and_build) => { - self.pending_build_request.replace(canonicalize_and_build); + super::ingress::Message::CanonicalizeAndBuild(CanonicalizeAndBuild { + height, + digest, + payload_attributes, + response, + }) => { + if let Some(prev) = self.build.take() { + let _ = prev.response.send(Err(eyre!("build request superseded"))); + } + + self.build.replace(PendingBuild { + target_digest: digest, + target_height: height, + payload_attributes, + response, + }); } + super::ingress::Message::Certification(certification) => { - self.pending_certifications.push_back(certification); + let incoming_round = certification.round(); + let should_replace = self + .head_sync + .as_ref() + .is_none_or(|cur| incoming_round > cur.target_round); + + if should_replace { + self.head_sync.replace(HeadSync { + target_digest: certification.proposal.payload, + target_round: incoming_round, + target_notarized: false, + }); + } } + super::ingress::Message::Notarization(notarization) => { - if Some(notarization.round()) - > self - .observed_notarizations - .last_key_value() - .map(|(round, _)| *round) - { - self.observed_notarizations - .insert(notarization.round(), notarization); + let incoming_round = notarization.round(); + let should_replace = self + .head_sync + .as_ref() + .is_none_or(|cur| incoming_round > cur.target_round); + + if should_replace { + self.head_sync.replace(HeadSync { + target_digest: notarization.proposal.payload, + target_round: incoming_round, + target_notarized: true, + }); } } + super::ingress::Message::FinalizedBlock(finalized_block) => { - self.pending_finalizations.push_back(finalized_block) + self.pending_finalizations.push_back(*finalized_block); } super::ingress::Message::FinalizedTip(new_tip) => { @@ -271,248 +289,134 @@ where .is_none_or(|old_tip| new_tip.round > old_tip.round) { self.finalized_tip.replace(new_tip); + self.prune() } } } Ok(()) } - fn process_state_submission_response( - &mut self, - StateSubmissionResponse { - fcu_response, - submission_type, - new_payload_response, - submitted_state, - }: StateSubmissionResponse, - ) -> eyre::Result<()> { - let fcu_status = fcu_response.wrap_err( - "communication with execution layer failed for sending forkchoice state state", - )?; - if let Some(new_payload_response) = new_payload_response { - let new_payload_status = new_payload_response - .wrap_err("communication with execution layer failed for sending new payload")?; - - // We require that all blocks sent to the execution layer in - // order and such that a) there are no gaps, and b) that no EL - // pipeline sync is triggered. Hence, the only valid status for - // a notarized and/or finalized block is VALID. - match new_payload_status.status { - PayloadStatusEnum::Valid => {} - other => { - Err(Report::msg(other)).wrap_err("new-payload encountered invalid status")? - } - } - } - // We require that the CL supplies all blocks to the EL in - // order. If any value other than VALID is returned this means - // that either a) the parent was not supplied (which is a - // violation of the previous invariant), or b) that something - // is wrong with the block or status itself. - // - // TODO: should this be relaxed for SubmissionType::Head and - // SubmissionType::Build and only be enforced on SubmissionType::Finalized? - match fcu_status.payload_status.status { - PayloadStatusEnum::Valid => {} - other => { - Err(Report::msg(other)).wrap_err("forkchoice-updated encountered invalid status")? + // Drops syncing state up to the finalzed tip + fn prune(&mut self) { + if let Some(finalized_tip) = self.finalized_tip.as_ref() { + if self + .head_sync + .as_ref() + .is_some_and(|s| s.target_round <= finalized_tip.round) + { + self.head_sync = None; } - } - match submission_type { - SubmissionType::Build { response } => { - let _ = response.send( - fcu_status - .payload_id - .ok_or_eyre("execution layer response was missing payload ID"), - ); + if let Some(build) = self.build.as_ref() + && build.target_height <= finalized_tip.height + { + debug!(%finalized_tip.height, "pruning build whose target height is now finalized"); + let prev = self.build.take().expect("just checked"); + let _ = prev.response.send(Err(eyre!("build target is finalized"))); } - SubmissionType::Finalized { acknowledgement } => acknowledgement.acknowledge(), - SubmissionType::Head => {} + + self.cache_of_certified_blocks + .retain(|height, _| *height > finalized_tip.height); } + } - self.latest_state = submitted_state; + /// Handles a completed sync-pipeline fetch. + fn handle_sync_fetch_response(&mut self, response: SyncFetchResponse) -> eyre::Result<()> { + let block = response.block_response.wrap_err(format!( + "fetch subsciption failed. digest={}", + response.target_digest + ))?; + + // If this response for the notarized sync target, adjust the to the parent (certified) + if response.target_notarized + && let Some(head_sync) = self.head_sync.as_mut() + && head_sync.target_digest == response.target_digest + && head_sync.target_notarized + { + head_sync.target_digest = block.parent_digest(); + head_sync.target_notarized = false; + return Ok(()); + } + // Cache the certified block + self.cache_of_certified_blocks.insert(block); Ok(()) } - /// Kicks off an ancestry walk from the oldest certificate received. - async fn kick_off_notarization_ancestry_walk(&mut self) { - // Keep - if self.pending_certified_block_fetch.is_some() { - return; - } - - if let Some((_, notarization)) = self.observed_notarizations.pop_first() { - // If on the first notarization of an epoch, don't start the fetch - // process: the goal is to forward all ancestors to the execution - // layer. But an epoch's first block's parent is the last block - // of the previous epoch. And for that boundary block it is expected - // that a) there exists finalization certificate, that b) this block - // always enters the system through the finalization pipeline, and - // finally c) that simplex engines for a specific epoch are only - // started after the genesis block for that epoch was processed and - // finalized (such that the executor actor would never observe a - // notarization for the new epoch). - if notarization.proposal.parent != View::zero() { - // TODO: investigate how this can be de-asynced. Manual future - // impl? This initial communication with the marshal actor to - // get a subscription should resolve immediately and would only - // stall if the marshal actor is extering backpressure. - let fetch_digest = notarization.proposal.payload; - let subscription = self - .marshal - .subscribe_by_digest(Some(notarization.round()), fetch_digest) - .await; - - self.pending_certified_block_fetch - .replace(FetchCertifiedBlock { - source_notarization: notarization, - fetch_digest, - fetch_height: None, - marshal: self.marshal.clone(), + /// Fetches the next unavailable block in the certified chain. + /// + /// Walks back from `head_sync.target_digest` until it finds the first + /// block not present in the cache. + /// + /// Returns `None` when the chain is fully connected from `target` back to + /// `latest_state.certified`. This marker is ensured to be correct as the driver + /// will roll back to finalized tip on a mismatch, thus continuing the walk + async fn next_pending_sync_fetch(&mut self) -> Option { + let latest_certified_digest = self.latest_state.certified_digest; + let sync = self + .head_sync + .as_ref() + .filter(|s| latest_certified_digest != s.target_digest)?; + + let mut current_digest = sync.target_digest; + loop { + match self + .cache_of_certified_blocks + .get_by_digest(¤t_digest) + { + None => { + // First missing link in the chain — fetch this. + let subscription = self.marshal.subscribe_by_digest(None, current_digest).await; + return Some(SyncFetch { + target_digest: current_digest, + target_notarized: sync.target_notarized + && current_digest == sync.target_digest, subscription, }); - } - } - } - - #[instrument( - skip_all, - fields( - %fetch_digest, - fetch_height = fetch_height.map(tracing::field::display), - source_notarization.digest = %source_notarization.proposal.payload, - source_notarization.round = %source_notarization.round(), - ), - )] - async fn continue_notarization_ancestry_walk( - &mut self, - FetchCertifiedBlockResponse { - source_notarization, - marshal, - block_response, - fetch_digest, - fetch_height, - }: FetchCertifiedBlockResponse, - ) { - // TODO: move to a separate handler - match block_response { - Err(error) => { - warn!( - %error, - "an error occured while walking and fetching the \ - notarization ancestors; aborting"); - return; - } - - Ok(block) => { - // Don't cache the block matching the certifcate: it is not - // guaranteed to be certified, only its parent blocks are. - // Certified blocks arrive as `Certified` activity from the - // simplex engine. - if source_notarization.proposal.payload != block.digest() { - self.cache_of_certified_blocks - .insert(block.height(), block.clone()); } - - let parent_digest = block.parent_digest(); - - let epoch_info = self - .epoch_strategy - .containing(block.height()) - .expect("epoch strategy is valid for all heights and epochs"); - - // Only schedule the parent if: - // 1. it is still within the same epoch, - // 2. isn't already cached, - // 3. is ahead of the latest canonicalized head, - if let Some(parent_height) = block.height().previous() - && parent_height >= epoch_info.first() - && self - .cache_of_certified_blocks - .get(&parent_height) - .is_none_or(|cached| cached.digest() != parent_digest) - && (parent_height > self.latest_state.certified_height - || (parent_height == self.latest_state.certified_height - && parent_digest != self.latest_state.certified_digest)) - { - let parent_round = self - .execution_node - .chain_spec() - .is_t4_active_at_timestamp(block.timestamp()) - .then(|| { - let context = block.context(); - let (parent_view, _) = &context.parent; - Round::new(context.epoch(), *parent_view) - }); - - let subscription = marshal - .subscribe_by_digest(parent_round, parent_digest) - .await; - self.pending_certified_block_fetch - .replace(FetchCertifiedBlock { - source_notarization, - fetch_digest: parent_digest, - fetch_height: Some(parent_height), - marshal, - subscription, - }); + Some(b) if b.parent_digest() == self.latest_state.certified_digest => { + // Chain is fully connected from target back to certified. + return None; + } + Some(b) => { + // Continue walking back through cached parents. + current_digest = b.parent_digest(); } } } } - /// Prunes those certifications and blocks older than what was finalized - /// locally. + /// Drives one syncing step. /// - /// Also cancels those requests that are currently under way so that they - /// never complete. + /// Order of operations: /// - /// Older notarizations will not have their ancestors fetched pro-actively, - /// and older blocks will never be forwarded as head blocks. - fn prune_certifications(&mut self) { - if let Some(finalized_tip) = &self.finalized_tip { - self.observed_notarizations - .retain(|round, _| round > &finalized_tip.round); - self.cache_of_certified_blocks - .retain(|height, _| height > &finalized_tip.height); - - if self - .pending_certified_block_fetch - .as_ref() - .is_some_and(|fetch| fetch.source_notarization.round() <= finalized_tip.round) - { - self.pending_certified_block_fetch.take(); - } + /// 1. If a state submission is in flight, do nothing. + /// 2. Startup backfill. Catch up the ELs finalized pointer to the CL. + /// 3. If `build` is satisfied (EL is at the build's parent), fire its FCU+payloadAttrs. + /// 4. Check if HEAD has reorged. Otherwise submit the next contiguous certified block. + /// 5. Kick off fetching needed blocks. + /// 6. Drain pending finalizations. + /// 7. FCU heartbeat. + #[allow(dead_code)] + async fn drive_sync(&mut self) -> eyre::Result<()> { + // Flush any stale sync targets + if let Some(sync) = self.head_sync.as_ref() + && self.latest_state.certified_digest == sync.target_digest + { + self.head_sync = None; } - } - /// Submits the next block + FCU to the execution layer. - /// - /// The order of blocks and state updates submitted is like this: - /// - /// 1. backfilling has highest priority: if the marshal actor's finalization - /// view is ahead of the execution layer, the node likely suffered a - /// persistence loss after shutdown/restart. - /// 2. `canonicalize-and-build` have highest priority so that proposers can - /// return as proposal as soon as follow. - /// 3. certified blocks follow to ensure the node stays at the tip of the - /// (notarized/certified) tip. - /// 4. notarized (but not yet finalized) blocks to get a lagging node to the - /// tip of the chain as fast as possible. - /// 5. finalized blocks last. - #[instrument(skip_all, err)] - async fn submit_next_state(&mut self) -> eyre::Result<()> { + // 1. FCU in progress if self.pending_state_submission.is_some() { return Ok(()); } + // 2. Backfill: catch the EL's finalized pointer up to marshal's if self.last_marshal_finalized_height > self.latest_state.finalized_height { debug!( consensus_layer.finalized_height = %self.last_marshal_finalized_height, execution_layer.finalized_height = %self.latest_state.finalized_height, - "gap detected on startup; reconciling consensus and exection layers", + "backfilling finalized blocks", ); let next_block_to_backfill = self.latest_state.finalized_height.next(); let block = self @@ -521,9 +425,9 @@ where .await .ok_or_else(|| { eyre!( - "reconciliation on restart failed; consensus layer is \ - at height `{}` while execution layer is at height `{}`, \ - but consensus layer does not have block `{next_block_to_backfill}`", + "v2 reconciliation failed; consensus layer is at height `{}` while \ + execution layer is at height `{}`, but consensus layer does not have \ + block `{next_block_to_backfill}`", self.last_marshal_finalized_height, self.latest_state.finalized_height, ) @@ -536,32 +440,34 @@ where digest: block.digest(), height: block.height(), block: Some(block), - base_state: self.latest_state.clone(), + base_state: self.latest_state, payload_attributes: None, } .send() .boxed(), ); - } else if let Some(CanonicalizeAndBuild { - height, - digest, - payload_attributes, - response, - }) = self.pending_build_request.take() + return Ok(()); + } + + // 3. Payload Build + if self + .build + .as_ref() + .is_some_and(|b| self.latest_state.certified_digest == b.target_digest) { - debug!( - %height, - %digest, - "setting head hash and kicking off payload build", - ); - // TODO: add a sanity check here: proposals should only work if on - // top of the canonicalized chain. That is, - // if `height = latest.certified_height`. + let PendingBuild { + target_digest, + target_height, + payload_attributes, + response, + } = self.build.take().expect("just checked"); + + debug!(%target_height, %target_digest, "firing build FCU + payload attributes"); self.pending_state_submission.replace( StateSubmission { execution_node: self.execution_node.clone(), - digest, - height, + digest: target_digest, + height: target_height, block: None, submission_type: SubmissionType::Build { response }, base_state: self.latest_state, @@ -570,43 +476,75 @@ where .send() .boxed(), ); - } else if let Some(certification) = self.pending_certifications.pop_front() { - debug!( - certification.round = %certification.round(), - digest = %certification.proposal.payload, - "setting head hash from certified block", - ); - let digest = certification.proposal.payload; - let block = self.marshal.get_block(&digest).await.ok_or_else(|| { - eyre!("we observed a certificatoin for block `{digest}`, we must have it") - })?; - self.pending_state_submission.replace( - StateSubmission { - execution_node: self.execution_node.clone(), - digest, - height: block.height(), - block: Some(block), - submission_type: SubmissionType::Head, - base_state: self.latest_state, - payload_attributes: None, - } - .send() - .boxed(), - ); - } else if let Some(FinalizedBlock { + return Ok(()); + } + + // 4. Detect Reorg or Submit forward + let next_height = self.latest_state.certified_height.next(); + if let Some(next) = self.cache_of_certified_blocks.get_by_height(&next_height) { + if next.parent_digest() != self.latest_state.certified_digest { + warn!( + height = %self.latest_state.certified_height, + old.head.digest = %self.latest_state.certified_digest, + reported.head.digest = %next.parent_digest(), + "sibling fork detected at certified head; rolling EL back to finalized", + ); + + let rolled_back = self.latest_state.with_rolled_back_to_finalized(); + self.pending_state_submission.replace( + StateSubmission { + execution_node: self.execution_node.clone(), + digest: rolled_back.finalized_digest, + height: rolled_back.finalized_height, + block: None, + submission_type: SubmissionType::Head, + base_state: rolled_back, + payload_attributes: None, + } + .send() + .boxed(), + ); + } else { + let block = self + .cache_of_certified_blocks + .remove_by_height(&next_height) + .expect("just observed"); + + debug!(block.height = %block.height(), block.digest = %block.digest(), "setting head hash"); + self.pending_state_submission.replace( + StateSubmission { + execution_node: self.execution_node.clone(), + digest: block.digest(), + height: block.height(), + block: Some(block), + submission_type: SubmissionType::Head, + base_state: self.latest_state, + payload_attributes: None, + } + .send() + .boxed(), + ); + } + + return Ok(()); + } + + // 5. Fetch missing blocks + if self.pending_sync_fetch.is_none() + && let Some(next) = self.next_pending_sync_fetch().await + { + debug!(digest=%next.target_digest, "fetching ancestor"); + self.pending_sync_fetch.replace(next); + return Ok(()); + } + + // 6. Drain the the marshal finalization pipeline. + if let Some(FinalizedBlock { block, acknowledgement, }) = self.pending_finalizations.pop_front() { - debug!( - block.height = %block.height(), - block.digest = %block.digest(), - "finalizing block", - ); - // TODO: need to assert contiguity - the finalized block here must be - // on top of the last finalized block as per the execution layer. Need - // to ensure this at startup so that it holds for the lifetime of the - // actor. + debug!(block.height = %block.height(), block.digest = %block.digest(), "finalizing block"); self.pending_state_submission.replace( StateSubmission { execution_node: self.execution_node.clone(), @@ -620,29 +558,11 @@ where .send() .boxed(), ); - } else if let Some(block) = self - .cache_of_certified_blocks - .remove(&self.latest_state.certified_height.next()) - { - debug!( - block.height = %block.height(), - block.digest = %block.digest(), - "setting head hash from certified block", - ); - self.pending_state_submission.replace( - StateSubmission { - execution_node: self.execution_node.clone(), - digest: block.digest(), - height: block.height(), - block: Some(block), - submission_type: SubmissionType::Head, - base_state: self.latest_state, - payload_attributes: None, - } - .send() - .boxed(), - ); - } else if self.fcu_heartbeat_timer.is_none() { + return Ok(()); + } + + // 7. Heartbeat + if self.fcu_heartbeat_timer.is_none() { debug!("heartbeat timer fired, resending latest FCU"); self.pending_state_submission.replace( StateSubmission { @@ -657,11 +577,70 @@ where .send() .boxed(), ); + return Ok(()); } - if self.pending_state_submission.is_some() { - self.fcu_heartbeat_timer - .replace(self.context.sleep(self.fcu_heartbeat_interval).boxed()); + + Ok(()) + } + + fn process_state_submission_response( + &mut self, + StateSubmissionResponse { + fcu_response, + submission_type, + new_payload_response, + submitted_state, + }: StateSubmissionResponse, + ) -> eyre::Result<()> { + let fcu_status = fcu_response.wrap_err( + "communication with execution layer failed for sending forkchoice state state", + )?; + if let Some(new_payload_response) = new_payload_response { + let new_payload_status = new_payload_response + .wrap_err("communication with execution layer failed for sending new payload")?; + + // We require that all blocks sent to the execution layer in + // order and such that a) there are no gaps, and b) that no EL + // pipeline sync is triggered. Hence, the only valid status for + // a notarized and/or finalized block is VALID. + match new_payload_status.status { + PayloadStatusEnum::Valid => {} + other => { + Err(Report::msg(other)).wrap_err("new-payload encountered invalid status")? + } + } + } + // We require that the CL supplies all blocks to the EL in + // order. If any value other than VALID is returned this means + // that either a) the parent was not supplied (which is a + // violation of the previous invariant), or b) that something + // is wrong with the block or status itself. + // + // TODO: should this be relaxed for SubmissionType::Head and + // SubmissionType::Build and only be enforced on SubmissionType::Finalized? + match fcu_status.payload_status.status { + PayloadStatusEnum::Valid => {} + other => { + Err(Report::msg(other)).wrap_err("forkchoice-updated encountered invalid status")? + } } + + match submission_type { + SubmissionType::Build { response } => { + let _ = response.send( + fcu_status + .payload_id + .ok_or_eyre("execution layer response was missing payload ID"), + ); + } + SubmissionType::Finalized { acknowledgement } => acknowledgement.acknowledge(), + SubmissionType::Head => {} + } + + self.latest_state = submitted_state; + self.fcu_heartbeat_timer + .replace(self.context.sleep(self.fcu_heartbeat_interval).boxed()); + Ok(()) } } @@ -676,60 +655,53 @@ enum SubmissionType { Head, } -struct FetchCertifiedBlockResponse { - /// The notarization that kicked off the request. - /// Note: this notarization is not expected to be certified at the moment - /// of receipt! - source_notarization: Notarization, - - /// The digest of the block fetched. - fetch_digest: Digest, - - /// The height of the block fetched (if known at the moment the block was - /// scheduled). - fetch_height: Option, - - /// The mailbox of the marshal actor to which the request will be made. - marshal: crate::alias::marshal::Mailbox, - - block_response: Result, +struct SyncFetch { + target_digest: Digest, + target_notarized: bool, + subscription: tokio::sync::oneshot::Receiver, } -/// A request to the marshal actor to return a given block. -struct FetchCertifiedBlock { - /// The notarization that kicked off the request. - /// Note: this notarization is not expected to be certified at the moment - /// of receipt! - source_notarization: Notarization, - - /// The digest of the block to be fetched. - fetch_digest: Digest, - - /// The height of the block to be fetched, if known. - fetch_height: Option, - - /// The mailbox of the marshal actor to which the request will be made. - marshal: crate::alias::marshal::Mailbox, - - /// An ongoing subscription to the - subscription: tokio::sync::oneshot::Receiver, +#[derive(Debug)] +struct SyncFetchResponse { + target_digest: Digest, + target_notarized: bool, + block_response: Result, } -impl Future for FetchCertifiedBlock { - type Output = FetchCertifiedBlockResponse; +impl Future for SyncFetch { + type Output = SyncFetchResponse; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let block_response = ready!(self.subscription.poll_unpin(cx)); - Poll::Ready(FetchCertifiedBlockResponse { - source_notarization: self.source_notarization.clone(), - marshal: self.marshal.clone(), + Poll::Ready(SyncFetchResponse { + target_digest: self.target_digest, + target_notarized: self.target_notarized, block_response, - fetch_digest: self.fetch_digest, - fetch_height: self.fetch_height, }) } } +/// A desired EL canonical head +struct HeadSync { + target_digest: Digest, + target_round: Round, + + /// If this target came from a notarization (vs a certification). The + /// notarization itself is not safe to set as HEAD; only its parent is. + target_notarized: bool, +} + +/// A pending payload-build request. Fires once the EL's certified head reaches +/// `target_digest`. Backfill is driven by `head_sync` — the build target is, +/// by construction, locally certified and on `head_sync`'s walk path, so this +/// slot is purely a passive trigger. +struct PendingBuild { + target_digest: Digest, + target_height: Height, + payload_attributes: Box, + response: oneshot::Sender>, +} + /// A state submission that is currently in flight to the execution layer. /// /// A state submission consists of 3 parts: @@ -791,11 +763,13 @@ impl StateSubmission { } SubmissionType::Finalized { .. } => base_state.with_updated_finalized(height, digest), }; + let fcu_response = execution_node .add_ons_handle .beacon_engine_handle .fork_choice_updated(submitted_state.to_fcu_state(), payload_attributes.take()) .await; + StateSubmissionResponse { submission_type, fcu_response, @@ -812,6 +786,60 @@ struct StateSubmissionResponse { submitted_state: LatestState, } +/// A cache of `Block`s indexed by both height and digest. +#[derive(Default)] +struct CertifiedBlockCache { + by_height: BTreeMap, + by_digest: HashMap, +} + +impl CertifiedBlockCache { + /// Inserts `block`, replacing any prior block at the same height. + /// + /// If a different block previously occupied this height, its digest + /// entry is removed from the index before the new digest is inserted. + fn insert(&mut self, block: Block) { + let height = block.height(); + let digest = block.digest(); + if let Some(prev) = self.by_height.insert(height, block) + && prev.digest() != digest + { + self.by_digest.remove(&prev.digest()); + } + + self.by_digest.insert(digest, height); + } + + fn get_by_height(&self, height: &Height) -> Option<&Block> { + self.by_height.get(height) + } + + fn get_by_digest(&self, digest: &Digest) -> Option<&Block> { + let height = self.by_digest.get(digest)?; + self.by_height.get(height) + } + + fn remove_by_height(&mut self, height: &Height) -> Option { + let block = self.by_height.remove(height)?; + self.by_digest.remove(&block.digest()); + Some(block) + } + + /// Mirrors `BTreeMap::retain`, keeping the digest index consistent. + fn retain(&mut self, mut f: F) + where + F: FnMut(&Height, &Block) -> bool, + { + self.by_height.retain(|height, block| { + let keep = f(height, block); + if !keep { + self.by_digest.remove(&block.digest()); + } + keep + }); + } +} + #[derive(Clone, Copy, Debug)] struct LatestState { finalized_digest: Digest, @@ -835,7 +863,7 @@ impl LatestState { ForkchoiceState { head_block_hash: self.certified_digest.0, safe_block_hash: self.finalized_digest.0, - finalized_block_hash: self.certified_digest.0, + finalized_block_hash: self.finalized_digest.0, } } @@ -860,4 +888,14 @@ impl LatestState { } this } + + /// Rolls the optimistic certified pointer back to match the finalized + /// pointer. Used to express a reorg at the certified head; bypasses + /// the monotonicity of `with_updated_certification`. + fn with_rolled_back_to_finalized(&self) -> Self { + let mut this = *self; + this.certified_height = this.finalized_height; + this.certified_digest = this.finalized_digest; + this + } } diff --git a/crates/commonware-node/src/executor/ingress.rs b/crates/commonware-node/src/executor/ingress.rs index 43814fe893..4bf03a4bb6 100644 --- a/crates/commonware-node/src/executor/ingress.rs +++ b/crates/commonware-node/src/executor/ingress.rs @@ -107,7 +107,7 @@ pub(super) enum Message { // === /// A finalized block received from the marshal actor and waiting to be /// executed and acknowledged. - FinalizedBlock(FinalizedBlock), + FinalizedBlock(Box), /// The highest finalized tip known to the marshal actor. FinalizedTip(FinalizedTip), @@ -131,7 +131,6 @@ pub(super) struct FinalizedBlock { pub(super) struct FinalizedTip { pub(super) round: Round, pub(super) height: Height, - pub(super) digest: Digest, } impl From for Message { @@ -142,13 +141,13 @@ impl From for Message { impl From for Message { fn from(value: FinalizedBlock) -> Self { - Message::FinalizedBlock(value) + Self::FinalizedBlock(Box::new(value)) } } impl From for Message { fn from(value: FinalizedTip) -> Self { - Message::FinalizedTip(value) + Self::FinalizedTip(value) } } @@ -179,12 +178,7 @@ impl Reporter for MarshalReporter { acknowledgement, } .into(), - Update::Tip(round, height, digest) => FinalizedTip { - round, - height, - digest, - } - .into(), + Update::Tip(round, height, _) => FinalizedTip { round, height }.into(), }; self.0 .inner diff --git a/crates/commonware-node/src/executor/mod.rs b/crates/commonware-node/src/executor/mod.rs index 1512c4487c..96fbea5a7c 100644 --- a/crates/commonware-node/src/executor/mod.rs +++ b/crates/commonware-node/src/executor/mod.rs @@ -1,7 +1,7 @@ //! The executor is sending fork-choice-updates to the execution layer. use std::sync::Arc; -use commonware_consensus::types::{FixedEpocher, Height}; +use commonware_consensus::types::Height; use commonware_runtime::{Clock, Metrics, Pacer, Spawner}; mod actor; @@ -31,10 +31,6 @@ pub(crate) struct Config { /// and to update the canonical chain by sending forkchoice updates. pub(crate) execution_node: Arc, - /// The epoch strategy for the actor to determine when it should stop - /// fetching backfilling block. - pub(crate) epoch_strategy: FixedEpocher, - /// The last finalized height according to the consensus layer. /// If on startup there is a mismatch between the execution layer and the /// consensus, then the node will fill the gap by backfilling blocks to diff --git a/crates/commonware-node/src/utils.rs b/crates/commonware-node/src/utils.rs index 3691b7ca8c..42fa496b71 100644 --- a/crates/commonware-node/src/utils.rs +++ b/crates/commonware-node/src/utils.rs @@ -55,10 +55,6 @@ impl OptionFuture { pub(crate) fn is_some(&self) -> bool { self.0.is_some() } - - pub(crate) fn as_ref(&mut self) -> Option<&F> { - self.0.as_ref() - } } impl Default for OptionFuture {