diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 51e487cab..995aaca10 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -606,7 +606,7 @@ impl Db { /// /// This includes the genesis block, which is not technically proven, but treated as such. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] - pub async fn select_latest_proven_in_sequence_block_num(&self) -> Result { + pub async fn proven_chain_tip(&self) -> Result { self.transact("select latest proven block num", |conn| { models::queries::select_latest_proven_in_sequence_block_num(conn) }) diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs index 5cbe3563a..dc59da94c 100644 --- a/crates/store/src/server/block_producer.rs +++ b/crates/store/src/server/block_producer.rs @@ -25,6 +25,7 @@ use crate::server::api::{ validate_note_commitments, validate_nullifiers, }; +use crate::state::Finality; // BLOCK PRODUCER ENDPOINTS // ================================================================================================ @@ -215,7 +216,7 @@ impl block_producer_server::BlockProducer for StoreApi { .inspect_err(|err| tracing::Span::current().set_error(err)) .map_err(|err| tonic::Status::internal(err.as_report()))?; - let block_height = self.state.latest_block_num().await.as_u32(); + let block_height = self.state.chain_tip(Finality::Committed).await.as_u32(); Ok(Response::new(proto::store::TransactionInputs { account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord { diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 85292bb72..e93b0f4b8 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -15,6 +15,7 @@ use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; use miden_node_utils::tracing::grpc::grpc_trace_fn; use tokio::net::TcpListener; +use tokio::sync::watch; use tokio::task::JoinSet; use tokio_stream::wrappers::TcpListenerStream; use tower_http::trace::TraceLayer; @@ -94,33 +95,90 @@ impl Store { let (termination_ask, mut termination_signal) = tokio::sync::mpsc::channel::(1); - let state = Arc::new( + let (state, tx_proven_tip) = State::load(&self.data_directory, self.storage_options, termination_ask) .await - .context("failed to load state")?, - ); + .context("failed to load state")?; + + let (proof_scheduler_task, chain_tip_sender) = Self::spawn_proof_scheduler( + &state, + self.block_prover_url, + self.max_concurrent_proofs, + tx_proven_tip, + ) + .await; - // Initialize local or remote block prover. - let block_prover = if let Some(url) = self.block_prover_url { + let mut join_set = Self::spawn_grpc_servers( + state, + chain_tip_sender, + self.grpc_options, + self.rpc_listener, + self.ntx_builder_listener, + self.block_producer_listener, + )?; + + let service = async move { + join_set.join_next().await.expect("joinset is not empty")?.map_err(Into::into) + }; + tokio::select! { + result = service => result, + Some(err) = termination_signal.recv() => { + Err(anyhow::anyhow!("received termination signal").context(err)) + }, + result = proof_scheduler_task => { + match result { + Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")), + Ok(Err(err)) => Err(err.context("proof scheduler fatal error")), + Err(join_err) => Err(join_err).context("proof scheduler panicked"), + } + } + } + } + + /// Initializes the block prover client and spawns the proof scheduler as a background task. + /// + /// Returns the scheduler task handle and the chain tip sender (needed by gRPC services to + /// notify the scheduler of new blocks). + async fn spawn_proof_scheduler( + state: &State, + block_prover_url: Option, + max_concurrent_proofs: NonZeroUsize, + proven_tip_tx: watch::Sender, + ) -> ( + tokio::task::JoinHandle>, + watch::Sender, + ) { + let block_prover = if let Some(url) = block_prover_url { Arc::new(BlockProver::remote(url)) } else { Arc::new(BlockProver::local()) }; - // Initialize the chain tip watch channel. - let chain_tip = state.latest_block_num().await; - let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip); + let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; + let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip); - // Spawn the proof scheduler as a background task. It will immediately pick up any - // unproven blocks from previous runs and begin proving them. - let proof_scheduler_task = proof_scheduler::spawn( + let handle = proof_scheduler::spawn( state.db().clone(), block_prover, state.block_store(), chain_tip_rx, - self.max_concurrent_proofs, + proven_tip_tx, + max_concurrent_proofs, ); + (handle, chain_tip_tx) + } + + /// Spawns the gRPC servers and the DB maintenance background task. + fn spawn_grpc_servers( + state: State, + chain_tip_sender: watch::Sender, + grpc_options: GrpcOptionsInternal, + rpc_listener: TcpListener, + ntx_builder_listener: TcpListener, + block_producer_listener: TcpListener, + ) -> anyhow::Result>> { + let state = Arc::new(state); let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state), chain_tip_sender: chain_tip_sender.clone(), @@ -150,61 +208,45 @@ impl Store { // // 5 minutes seems like a reasonable interval, where this should have minimal database // IO impact while providing a decent view into table growth over time. - let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); - let database = Arc::clone(&state); + let mut interval = tokio::time::interval(Duration::from_mins(5)); loop { interval.tick().await; - let _ = database.analyze_table_sizes().await; + let _ = state.analyze_table_sizes().await; } }); - // Build the gRPC server with the API services and trace layer. join_set.spawn( tonic::transport::Server::builder() - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(rpc_service) .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)), + .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); join_set.spawn( tonic::transport::Server::builder() - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(ntx_builder_service) .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)), + .serve_with_incoming(TcpListenerStream::new(ntx_builder_listener)), ); join_set.spawn( tonic::transport::Server::builder() .accept_http1(true) - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(block_producer_service) .add_service(reflection_service) - .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)), + .serve_with_incoming(TcpListenerStream::new(block_producer_listener)), ); - // SAFETY: The joinset is definitely not empty. - let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) }; - tokio::select! { - result = service => result, - Some(err) = termination_signal.recv() => { - Err(anyhow::anyhow!("received termination signal").context(err)) - }, - result = proof_scheduler_task => { - match result { - Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")), - Ok(Err(err)) => Err(err.context("proof scheduler fatal error")), - Err(join_err) => Err(join_err).context("proof scheduler panicked"), - } - } - } + Ok(join_set) } } diff --git a/crates/store/src/server/ntx_builder.rs b/crates/store/src/server/ntx_builder.rs index 41217c4da..10487723e 100644 --- a/crates/store/src/server/ntx_builder.rs +++ b/crates/store/src/server/ntx_builder.rs @@ -31,6 +31,7 @@ use crate::server::api::{ read_block_range, read_root, }; +use crate::state::Finality; // NTX BUILDER ENDPOINTS // ================================================================================================ @@ -142,7 +143,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let mut chain_tip = self.state.latest_block_num().await; + let mut chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(Some(request), "GetNetworkAccountIds")? .into_inclusive_range::(&chain_tip)?; @@ -156,7 +157,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { last_block_included = chain_tip; } - chain_tip = self.state.latest_block_num().await; + chain_tip = self.state.chain_tip(Finality::Committed).await; Ok(Response::new(proto::store::NetworkAccountIdList { account_ids, @@ -245,7 +246,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.latest_block_num().await + self.state.chain_tip(Finality::Committed).await }; // Retrieve the asset witnesses. @@ -296,7 +297,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.latest_block_num().await + self.state.chain_tip(Finality::Committed).await }; // Retrieve the storage map witness. @@ -313,7 +314,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { key: Some(map_key.into()), proof: Some(proof.into()), }), - block_num: self.state.latest_block_num().await.as_u32(), + block_num: self.state.chain_tip(Finality::Committed).await.as_u32(), })) } } diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index 38fae3670..ba3275584 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -59,13 +59,18 @@ impl ProofTaskJoinSet { db: &Arc, block_prover: &Arc, block_store: &Arc, + proven_tip_tx: &watch::Sender, block_num: BlockNumber, ) { let db = Arc::clone(db); let block_prover = Arc::clone(block_prover); let block_store = Arc::clone(block_store); - self.0 - .spawn(async move { prove_block(&db, &block_prover, &block_store, block_num).await }); + self.0.spawn({ + let proven_tip_tx = proven_tip_tx.clone(); + async move { + prove_block(&db, &block_prover, &block_store, &proven_tip_tx, block_num).await + } + }); } /// Returns the result of the next completed task, or pends forever if the set is empty. @@ -98,9 +103,17 @@ pub fn spawn( block_prover: Arc, block_store: Arc, chain_tip_rx: watch::Receiver, + proven_tip_tx: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> JoinHandle> { - tokio::spawn(run(db, block_prover, block_store, chain_tip_rx, max_concurrent_proofs)) + tokio::spawn(run( + db, + block_prover, + block_store, + chain_tip_rx, + proven_tip_tx, + max_concurrent_proofs, + )) } /// Main loop of the proof scheduler. @@ -117,6 +130,7 @@ async fn run( block_prover: Arc, block_store: Arc, mut chain_tip_rx: watch::Receiver, + proven_tip_tx: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> anyhow::Result<()> { info!(target: COMPONENT, "Proof scheduler started"); @@ -127,7 +141,7 @@ async fn run( // Highest block number that is in-flight or has been proven. Used to avoid re-querying // blocks we've already scheduled. Initialized from the in-sequence tip so we skip // already-proven blocks on restart. - let mut highest_scheduled = db.select_latest_proven_in_sequence_block_num().await?; + let mut highest_scheduled = db.proven_chain_tip().await?; loop { // Query the DB for unproven blocks beyond what we've already scheduled. @@ -140,7 +154,7 @@ async fn run( } for block_num in unproven { - join_set.spawn(&db, &block_prover, &block_store, block_num); + join_set.spawn(&db, &block_prover, &block_store, &proven_tip_tx, block_num); } } @@ -171,6 +185,7 @@ async fn prove_block( db: &Db, block_prover: &BlockProver, block_store: &BlockStore, + proven_tip_tx: &watch::Sender, block_num: BlockNumber, ) -> anyhow::Result<()> { const MAX_RETRIES: u32 = 10; @@ -189,6 +204,7 @@ async fn prove_block( // Mark the block as proven and advance the sequence in the database. let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?; if let Some(&last) = advanced_in_sequence.last() { + proven_tip_tx.send(last)?; info!( target = COMPONENT, block.number = %block_num, diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 2ef33295e..448e4d0e4 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -41,6 +41,18 @@ use crate::server::api::{ read_root, validate_nullifiers, }; +use crate::state::Finality; + +impl From for Finality { + fn from(finality: proto::rpc::Finality) -> Self { + match finality { + proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => { + Finality::Committed + }, + proto::rpc::Finality::Proven => Finality::Proven, + } + } +} // CLIENT ENDPOINTS // ================================================================================================ @@ -94,7 +106,7 @@ impl rpc_server::Rpc for StoreApi { return Err(SyncNullifiersError::InvalidPrefixLength(request.prefix_len).into()); } - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(request.block_range, "SyncNullifiersRequest")? .into_inclusive_range::(&chain_tip)?; @@ -129,7 +141,7 @@ impl rpc_server::Rpc for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(request.block_range, "SyncNotesRequest")? .into_inclusive_range::(&chain_tip)?; @@ -159,31 +171,20 @@ impl rpc_server::Rpc for StoreApi { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; let block_range = request .block_range .ok_or_else(|| proto::rpc::SyncChainMmrRequest::missing_field(stringify!(block_range))) .map_err(SyncChainMmrError::DeserializationFailed)?; - // Determine the effective tip based on the requested finality level. - let effective_tip = match request.finality() { - proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => chain_tip, - proto::rpc::Finality::Proven => self - .state - .db() - .select_latest_proven_in_sequence_block_num() - .await - .map_err(SyncChainMmrError::DatabaseError)?, - }; + let chain_tip = self.state.chain_tip(request.finality().into()).await; let block_from = BlockNumber::from(block_range.block_from); - if block_from > effective_tip { - Err(SyncChainMmrError::FutureBlock { chain_tip: effective_tip, block_from })?; + if block_from > chain_tip { + Err(SyncChainMmrError::FutureBlock { chain_tip, block_from })?; } - let block_to = - block_range.block_to.map_or(effective_tip, BlockNumber::from).min(effective_tip); + let block_to = block_range.block_to.map_or(chain_tip, BlockNumber::from).min(chain_tip); if block_from > block_to { Err(SyncChainMmrError::InvalidBlockRange(InvalidBlockRange::StartGreaterThanEnd { @@ -270,7 +271,7 @@ impl rpc_server::Rpc for StoreApi { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let account_id: AccountId = read_account_id::(request.account_id)?; @@ -326,7 +327,7 @@ impl rpc_server::Rpc for StoreApi { Err(SyncAccountStorageMapsError::AccountNotPublic(account_id))?; } - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::( request.block_range, "SyncAccountStorageMapsRequest", @@ -366,7 +367,7 @@ impl rpc_server::Rpc for StoreApi { Ok(Response::new(proto::rpc::StoreStatus { version: env!("CARGO_PKG_VERSION").to_string(), status: "connected".to_string(), - chain_tip: self.state.latest_block_num().await.as_u32(), + chain_tip: self.state.chain_tip(Finality::Committed).await.as_u32(), })) } @@ -398,7 +399,7 @@ impl rpc_server::Rpc for StoreApi { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::( request.block_range, "SyncTransactionsRequest", diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index 418c890de..38df4b14a 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -34,7 +34,7 @@ use miden_protocol::crypto::merkle::mmr::{MmrPeaks, MmrProof, PartialMmr}; use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtProof, SmtStorage}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; use miden_protocol::transaction::PartialBlockchain; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, watch}; use tracing::{info, instrument}; use crate::account_state_forest::{AccountStateForest, WitnessError}; @@ -69,6 +69,18 @@ use loader::{ mod apply_block; mod sync_state; +// FINALITY +// ================================================================================================ + +/// The finality level for chain tip queries. +#[derive(Debug, Clone, Copy)] +pub enum Finality { + /// The latest committed (but not necessarily proven) block. + Committed, + /// The latest block that has been proven in an unbroken sequence from genesis. + Proven, +} + // STRUCTURES // ================================================================================================ @@ -125,6 +137,9 @@ pub struct State { /// Request termination of the process due to a fatal internal state error. termination_ask: tokio::sync::mpsc::Sender, + + /// The latest proven-in-sequence block number, updated by the proof scheduler. + proven_tip_rx: watch::Receiver, } impl State { @@ -137,7 +152,7 @@ impl State { data_path: &Path, storage_options: StorageOptions, termination_ask: tokio::sync::mpsc::Sender, - ) -> Result { + ) -> Result<(Self, watch::Sender), StateInitializationError> { let data_directory = DataDirectory::load(data_path.to_path_buf()) .map_err(StateInitializationError::DataDirectoryLoadError)?; @@ -183,14 +198,23 @@ impl State { let writer = Mutex::new(()); let db = Arc::new(db); - Ok(Self { - db, - block_store, - inner, - forest, - writer, - termination_ask, - }) + // Initialize the proven tip from database. + let proven_tip = + db.proven_chain_tip().await.map_err(StateInitializationError::DatabaseError)?; + let (proven_tip_tx, proven_tip_rx) = watch::channel(proven_tip); + + Ok(( + Self { + db, + block_store, + inner, + forest, + writer, + termination_ask, + proven_tip_rx, + }, + proven_tip_tx, + )) } /// Returns the database. @@ -264,7 +288,7 @@ impl State { ) -> Result, GetCurrentBlockchainDataError> { let blockchain = &self.inner.read().await.blockchain; if let Some(number) = block_num - && number == self.latest_block_num().await + && number == self.chain_tip(Finality::Committed).await { return Ok(None); } @@ -836,15 +860,22 @@ impl State { &self, block_num: BlockNumber, ) -> Result>, DatabaseError> { - if block_num > self.latest_block_num().await { + if block_num > self.chain_tip(Finality::Committed).await { return Ok(None); } self.block_store.load_block(block_num).await.map_err(Into::into) } - /// Returns the latest block number. - pub async fn latest_block_num(&self) -> BlockNumber { - self.inner.read().await.latest_block_num() + /// Returns the effective chain tip for the given finality level. + /// + /// - [`Finality::Committed`]: returns the latest committed block number (from in-memory MMR). + /// - [`Finality::Proven`]: returns the latest proven-in-sequence block number (cached via watch + /// channel, updated by the proof scheduler). + pub async fn chain_tip(&self, finality: Finality) -> BlockNumber { + match finality { + Finality::Committed => self.inner.read().await.latest_block_num(), + Finality::Proven => *self.proven_tip_rx.borrow(), + } } /// Emits metrics for each database table's size.