From 8e6a1ebe757bafc45f806ca1af9c998d1f936262 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 30 Mar 2026 15:55:54 +1300 Subject: [PATCH 01/10] Cache proven chain tip in State --- crates/store/src/server/block_producer.rs | 3 +- crates/store/src/server/mod.rs | 9 ++-- crates/store/src/server/ntx_builder.rs | 11 ++-- crates/store/src/server/proof_scheduler.rs | 22 ++++++-- crates/store/src/server/rpc_api.rs | 43 +++++++-------- crates/store/src/state/mod.rs | 63 ++++++++++++++++------ 6 files changed, 101 insertions(+), 50 deletions(-) diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs index 5cbe3563a..dc59da94c 100644 --- a/crates/store/src/server/block_producer.rs +++ b/crates/store/src/server/block_producer.rs @@ -25,6 +25,7 @@ use crate::server::api::{ validate_note_commitments, validate_nullifiers, }; +use crate::state::Finality; // BLOCK PRODUCER ENDPOINTS // ================================================================================================ @@ -215,7 +216,7 @@ impl block_producer_server::BlockProducer for StoreApi { .inspect_err(|err| tracing::Span::current().set_error(err)) .map_err(|err| tonic::Status::internal(err.as_report()))?; - let block_height = self.state.latest_block_num().await.as_u32(); + let block_height = self.state.chain_tip(Finality::Committed).await.as_u32(); Ok(Response::new(proto::store::TransactionInputs { account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord { diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 85292bb72..3ef35d1a7 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -94,11 +94,11 @@ impl Store { let (termination_ask, mut termination_signal) = tokio::sync::mpsc::channel::(1); - let state = Arc::new( + let (state, proven_tip_sender) = State::load(&self.data_directory, self.storage_options, termination_ask) .await - .context("failed to load state")?, - ); + .context("failed to load state")?; + let state = Arc::new(state); // Initialize local or remote block prover. let block_prover = if let Some(url) = self.block_prover_url { @@ -108,7 +108,7 @@ impl Store { }; // Initialize the chain tip watch channel. - let chain_tip = state.latest_block_num().await; + let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip); // Spawn the proof scheduler as a background task. It will immediately pick up any @@ -118,6 +118,7 @@ impl Store { block_prover, state.block_store(), chain_tip_rx, + proven_tip_sender, self.max_concurrent_proofs, ); diff --git a/crates/store/src/server/ntx_builder.rs b/crates/store/src/server/ntx_builder.rs index 41217c4da..10487723e 100644 --- a/crates/store/src/server/ntx_builder.rs +++ b/crates/store/src/server/ntx_builder.rs @@ -31,6 +31,7 @@ use crate::server::api::{ read_block_range, read_root, }; +use crate::state::Finality; // NTX BUILDER ENDPOINTS // ================================================================================================ @@ -142,7 +143,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let mut chain_tip = self.state.latest_block_num().await; + let mut chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(Some(request), "GetNetworkAccountIds")? .into_inclusive_range::(&chain_tip)?; @@ -156,7 +157,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { last_block_included = chain_tip; } - chain_tip = self.state.latest_block_num().await; + chain_tip = self.state.chain_tip(Finality::Committed).await; Ok(Response::new(proto::store::NetworkAccountIdList { account_ids, @@ -245,7 +246,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.latest_block_num().await + self.state.chain_tip(Finality::Committed).await }; // Retrieve the asset witnesses. @@ -296,7 +297,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.latest_block_num().await + self.state.chain_tip(Finality::Committed).await }; // Retrieve the storage map witness. @@ -313,7 +314,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { key: Some(map_key.into()), proof: Some(proof.into()), }), - block_num: self.state.latest_block_num().await.as_u32(), + block_num: self.state.chain_tip(Finality::Committed).await.as_u32(), })) } } diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index 38fae3670..3c2748ba2 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -59,13 +59,16 @@ impl ProofTaskJoinSet { db: &Arc, block_prover: &Arc, block_store: &Arc, + proven_tip_sender: &watch::Sender, block_num: BlockNumber, ) { let db = Arc::clone(db); let block_prover = Arc::clone(block_prover); let block_store = Arc::clone(block_store); - self.0 - .spawn(async move { prove_block(&db, &block_prover, &block_store, block_num).await }); + let proven_tip_sender = proven_tip_sender.clone(); + self.0.spawn(async move { + prove_block(&db, &block_prover, &block_store, &proven_tip_sender, block_num).await + }); } /// Returns the result of the next completed task, or pends forever if the set is empty. @@ -98,9 +101,17 @@ pub fn spawn( block_prover: Arc, block_store: Arc, chain_tip_rx: watch::Receiver, + proven_tip_sender: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> JoinHandle> { - tokio::spawn(run(db, block_prover, block_store, chain_tip_rx, max_concurrent_proofs)) + tokio::spawn(run( + db, + block_prover, + block_store, + chain_tip_rx, + proven_tip_sender, + max_concurrent_proofs, + )) } /// Main loop of the proof scheduler. @@ -117,6 +128,7 @@ async fn run( block_prover: Arc, block_store: Arc, mut chain_tip_rx: watch::Receiver, + proven_tip_sender: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> anyhow::Result<()> { info!(target: COMPONENT, "Proof scheduler started"); @@ -140,7 +152,7 @@ async fn run( } for block_num in unproven { - join_set.spawn(&db, &block_prover, &block_store, block_num); + join_set.spawn(&db, &block_prover, &block_store, &proven_tip_sender, block_num); } } @@ -171,6 +183,7 @@ async fn prove_block( db: &Db, block_prover: &BlockProver, block_store: &BlockStore, + proven_tip_sender: &watch::Sender, block_num: BlockNumber, ) -> anyhow::Result<()> { const MAX_RETRIES: u32 = 10; @@ -189,6 +202,7 @@ async fn prove_block( // Mark the block as proven and advance the sequence in the database. let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?; if let Some(&last) = advanced_in_sequence.last() { + proven_tip_sender.send(last)?; info!( target = COMPONENT, block.number = %block_num, diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 2ef33295e..448e4d0e4 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -41,6 +41,18 @@ use crate::server::api::{ read_root, validate_nullifiers, }; +use crate::state::Finality; + +impl From for Finality { + fn from(finality: proto::rpc::Finality) -> Self { + match finality { + proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => { + Finality::Committed + }, + proto::rpc::Finality::Proven => Finality::Proven, + } + } +} // CLIENT ENDPOINTS // ================================================================================================ @@ -94,7 +106,7 @@ impl rpc_server::Rpc for StoreApi { return Err(SyncNullifiersError::InvalidPrefixLength(request.prefix_len).into()); } - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(request.block_range, "SyncNullifiersRequest")? .into_inclusive_range::(&chain_tip)?; @@ -129,7 +141,7 @@ impl rpc_server::Rpc for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::(request.block_range, "SyncNotesRequest")? .into_inclusive_range::(&chain_tip)?; @@ -159,31 +171,20 @@ impl rpc_server::Rpc for StoreApi { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; let block_range = request .block_range .ok_or_else(|| proto::rpc::SyncChainMmrRequest::missing_field(stringify!(block_range))) .map_err(SyncChainMmrError::DeserializationFailed)?; - // Determine the effective tip based on the requested finality level. - let effective_tip = match request.finality() { - proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => chain_tip, - proto::rpc::Finality::Proven => self - .state - .db() - .select_latest_proven_in_sequence_block_num() - .await - .map_err(SyncChainMmrError::DatabaseError)?, - }; + let chain_tip = self.state.chain_tip(request.finality().into()).await; let block_from = BlockNumber::from(block_range.block_from); - if block_from > effective_tip { - Err(SyncChainMmrError::FutureBlock { chain_tip: effective_tip, block_from })?; + if block_from > chain_tip { + Err(SyncChainMmrError::FutureBlock { chain_tip, block_from })?; } - let block_to = - block_range.block_to.map_or(effective_tip, BlockNumber::from).min(effective_tip); + let block_to = block_range.block_to.map_or(chain_tip, BlockNumber::from).min(chain_tip); if block_from > block_to { Err(SyncChainMmrError::InvalidBlockRange(InvalidBlockRange::StartGreaterThanEnd { @@ -270,7 +271,7 @@ impl rpc_server::Rpc for StoreApi { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let account_id: AccountId = read_account_id::(request.account_id)?; @@ -326,7 +327,7 @@ impl rpc_server::Rpc for StoreApi { Err(SyncAccountStorageMapsError::AccountNotPublic(account_id))?; } - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::( request.block_range, "SyncAccountStorageMapsRequest", @@ -366,7 +367,7 @@ impl rpc_server::Rpc for StoreApi { Ok(Response::new(proto::rpc::StoreStatus { version: env!("CARGO_PKG_VERSION").to_string(), status: "connected".to_string(), - chain_tip: self.state.latest_block_num().await.as_u32(), + chain_tip: self.state.chain_tip(Finality::Committed).await.as_u32(), })) } @@ -398,7 +399,7 @@ impl rpc_server::Rpc for StoreApi { let request = request.into_inner(); - let chain_tip = self.state.latest_block_num().await; + let chain_tip = self.state.chain_tip(Finality::Committed).await; let block_range = read_block_range::( request.block_range, "SyncTransactionsRequest", diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index 418c890de..a2f1d5e3e 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -34,7 +34,7 @@ use miden_protocol::crypto::merkle::mmr::{MmrPeaks, MmrProof, PartialMmr}; use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtProof, SmtStorage}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; use miden_protocol::transaction::PartialBlockchain; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock, watch}; use tracing::{info, instrument}; use crate::account_state_forest::{AccountStateForest, WitnessError}; @@ -69,6 +69,18 @@ use loader::{ mod apply_block; mod sync_state; +// FINALITY +// ================================================================================================ + +/// The finality level for chain tip queries. +#[derive(Debug, Clone, Copy)] +pub enum Finality { + /// The latest committed (but not necessarily proven) block. + Committed, + /// The latest block that has been proven in an unbroken sequence from genesis. + Proven, +} + // STRUCTURES // ================================================================================================ @@ -125,6 +137,9 @@ pub struct State { /// Request termination of the process due to a fatal internal state error. termination_ask: tokio::sync::mpsc::Sender, + + /// The latest proven-in-sequence block number, updated by the proof scheduler. + proven_tip: watch::Receiver, } impl State { @@ -137,7 +152,7 @@ impl State { data_path: &Path, storage_options: StorageOptions, termination_ask: tokio::sync::mpsc::Sender, - ) -> Result { + ) -> Result<(Self, watch::Sender), StateInitializationError> { let data_directory = DataDirectory::load(data_path.to_path_buf()) .map_err(StateInitializationError::DataDirectoryLoadError)?; @@ -183,14 +198,25 @@ impl State { let writer = Mutex::new(()); let db = Arc::new(db); - Ok(Self { - db, - block_store, - inner, - forest, - writer, - termination_ask, - }) + // Initialize the proven tip from database. + let proven_tip = db + .select_latest_proven_in_sequence_block_num() + .await + .map_err(StateInitializationError::DatabaseError)?; + let (proven_tip_sender, proven_tip) = watch::channel(proven_tip); + + Ok(( + Self { + db, + block_store, + inner, + forest, + writer, + termination_ask, + proven_tip, + }, + proven_tip_sender, + )) } /// Returns the database. @@ -264,7 +290,7 @@ impl State { ) -> Result, GetCurrentBlockchainDataError> { let blockchain = &self.inner.read().await.blockchain; if let Some(number) = block_num - && number == self.latest_block_num().await + && number == self.chain_tip(Finality::Committed).await { return Ok(None); } @@ -836,15 +862,22 @@ impl State { &self, block_num: BlockNumber, ) -> Result>, DatabaseError> { - if block_num > self.latest_block_num().await { + if block_num > self.chain_tip(Finality::Committed).await { return Ok(None); } self.block_store.load_block(block_num).await.map_err(Into::into) } - /// Returns the latest block number. - pub async fn latest_block_num(&self) -> BlockNumber { - self.inner.read().await.latest_block_num() + /// Returns the effective chain tip for the given finality level. + /// + /// - [`Finality::Committed`]: returns the latest committed block number (from in-memory MMR). + /// - [`Finality::Proven`]: returns the latest proven-in-sequence block number (cached via watch + /// channel, updated by the proof scheduler). + pub async fn chain_tip(&self, finality: Finality) -> BlockNumber { + match finality { + Finality::Committed => self.inner.read().await.latest_block_num(), + Finality::Proven => *self.proven_tip.borrow(), + } } /// Emits metrics for each database table's size. From f4cb0e82e86f561900cf1d0e7aedcdf17926c686 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 30 Mar 2026 16:16:23 +1300 Subject: [PATCH 02/10] Split up Store::serve() --- crates/store/src/server/mod.rs | 113 ++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 3ef35d1a7..07cc465a5 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -100,39 +100,95 @@ impl Store { .context("failed to load state")?; let state = Arc::new(state); - // Initialize local or remote block prover. - let block_prover = if let Some(url) = self.block_prover_url { + let (proof_scheduler_task, chain_tip_sender) = Self::spawn_proof_scheduler( + &state, + self.block_prover_url, + self.max_concurrent_proofs, + proven_tip_sender, + ) + .await; + + let mut join_set = Self::spawn_grpc_servers( + &state, + chain_tip_sender, + self.grpc_options, + self.rpc_listener, + self.ntx_builder_listener, + self.block_producer_listener, + )?; + + let service = async move { + join_set.join_next().await.expect("joinset is not empty")?.map_err(Into::into) + }; + tokio::select! { + result = service => result, + Some(err) = termination_signal.recv() => { + Err(anyhow::anyhow!("received termination signal").context(err)) + }, + result = proof_scheduler_task => { + match result { + Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")), + Ok(Err(err)) => Err(err.context("proof scheduler fatal error")), + Err(join_err) => Err(join_err).context("proof scheduler panicked"), + } + } + } + } + + /// Initializes the block prover client and spawns the proof scheduler as a background task. + /// + /// Returns the scheduler task handle and the chain tip sender (needed by gRPC services to + /// notify the scheduler of new blocks). + async fn spawn_proof_scheduler( + state: &Arc, + block_prover_url: Option, + max_concurrent_proofs: NonZeroUsize, + proven_tip_sender: tokio::sync::watch::Sender, + ) -> ( + tokio::task::JoinHandle>, + tokio::sync::watch::Sender, + ) { + let block_prover = if let Some(url) = block_prover_url { Arc::new(BlockProver::remote(url)) } else { Arc::new(BlockProver::local()) }; - // Initialize the chain tip watch channel. let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip); - // Spawn the proof scheduler as a background task. It will immediately pick up any - // unproven blocks from previous runs and begin proving them. - let proof_scheduler_task = proof_scheduler::spawn( + let handle = proof_scheduler::spawn( state.db().clone(), block_prover, state.block_store(), chain_tip_rx, proven_tip_sender, - self.max_concurrent_proofs, + max_concurrent_proofs, ); + (handle, chain_tip_sender) + } + + /// Spawns the gRPC servers and the DB maintenance background task. + fn spawn_grpc_servers( + state: &Arc, + chain_tip_sender: tokio::sync::watch::Sender, + grpc_options: GrpcOptionsInternal, + rpc_listener: TcpListener, + ntx_builder_listener: TcpListener, + block_producer_listener: TcpListener, + ) -> anyhow::Result>> { let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { - state: Arc::clone(&state), + state: Arc::clone(state), chain_tip_sender: chain_tip_sender.clone(), }); let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi { - state: Arc::clone(&state), + state: Arc::clone(state), chain_tip_sender: chain_tip_sender.clone(), }); let block_producer_service = store::block_producer_server::BlockProducerServer::new(api::StoreApi { - state: Arc::clone(&state), + state: Arc::clone(state), chain_tip_sender, }); let reflection_service = tonic_reflection::server::Builder::configure() @@ -146,66 +202,47 @@ impl Store { let mut join_set = JoinSet::new(); + let state_clone = Arc::clone(state); join_set.spawn(async move { - // Manual tests on testnet indicate each iteration takes ~2s once things are OS cached. - // - // 5 minutes seems like a reasonable interval, where this should have minimal database - // IO impact while providing a decent view into table growth over time. let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); - let database = Arc::clone(&state); loop { interval.tick().await; - let _ = database.analyze_table_sizes().await; + let _ = state_clone.analyze_table_sizes().await; } }); - // Build the gRPC server with the API services and trace layer. join_set.spawn( tonic::transport::Server::builder() - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(rpc_service) .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)), + .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); join_set.spawn( tonic::transport::Server::builder() - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(ntx_builder_service) .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)), + .serve_with_incoming(TcpListenerStream::new(ntx_builder_listener)), ); join_set.spawn( tonic::transport::Server::builder() .accept_http1(true) - .timeout(self.grpc_options.request_timeout) + .timeout(grpc_options.request_timeout) .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .add_service(block_producer_service) .add_service(reflection_service) - .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)), + .serve_with_incoming(TcpListenerStream::new(block_producer_listener)), ); - // SAFETY: The joinset is definitely not empty. - let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) }; - tokio::select! { - result = service => result, - Some(err) = termination_signal.recv() => { - Err(anyhow::anyhow!("received termination signal").context(err)) - }, - result = proof_scheduler_task => { - match result { - Ok(Ok(())) => Err(anyhow::anyhow!("proof scheduler exited unexpectedly")), - Ok(Err(err)) => Err(err.context("proof scheduler fatal error")), - Err(join_err) => Err(join_err).context("proof scheduler panicked"), - } - } - } + Ok(join_set) } } From 4c25e43856c7e7dad06a6f1612c4474cc5d515cf Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 30 Mar 2026 16:21:33 +1300 Subject: [PATCH 03/10] RM import prefix for watch --- crates/store/src/server/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 07cc465a5..7300c4e0b 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -15,6 +15,7 @@ use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; use miden_node_utils::tracing::grpc::grpc_trace_fn; use tokio::net::TcpListener; +use tokio::sync::watch; use tokio::task::JoinSet; use tokio_stream::wrappers::TcpListenerStream; use tower_http::trace::TraceLayer; @@ -143,10 +144,10 @@ impl Store { state: &Arc, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, - proven_tip_sender: tokio::sync::watch::Sender, + proven_tip_sender: watch::Sender, ) -> ( tokio::task::JoinHandle>, - tokio::sync::watch::Sender, + watch::Sender, ) { let block_prover = if let Some(url) = block_prover_url { Arc::new(BlockProver::remote(url)) @@ -155,7 +156,7 @@ impl Store { }; let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; - let (chain_tip_sender, chain_tip_rx) = tokio::sync::watch::channel(chain_tip); + let (chain_tip_sender, chain_tip_rx) = watch::channel(chain_tip); let handle = proof_scheduler::spawn( state.db().clone(), @@ -172,7 +173,7 @@ impl Store { /// Spawns the gRPC servers and the DB maintenance background task. fn spawn_grpc_servers( state: &Arc, - chain_tip_sender: tokio::sync::watch::Sender, + chain_tip_sender: watch::Sender, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ntx_builder_listener: TcpListener, From 961f767a4d8f4633530f5109fb3ff6668285d6a5 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 30 Mar 2026 16:23:46 +1300 Subject: [PATCH 04/10] Simplify arc logic --- crates/store/src/server/mod.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 7300c4e0b..900272d50 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -110,7 +110,7 @@ impl Store { .await; let mut join_set = Self::spawn_grpc_servers( - &state, + state, chain_tip_sender, self.grpc_options, self.rpc_listener, @@ -172,7 +172,7 @@ impl Store { /// Spawns the gRPC servers and the DB maintenance background task. fn spawn_grpc_servers( - state: &Arc, + state: Arc, chain_tip_sender: watch::Sender, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, @@ -180,16 +180,16 @@ impl Store { block_producer_listener: TcpListener, ) -> anyhow::Result>> { let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { - state: Arc::clone(state), + state: Arc::clone(&state), chain_tip_sender: chain_tip_sender.clone(), }); let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi { - state: Arc::clone(state), + state: Arc::clone(&state), chain_tip_sender: chain_tip_sender.clone(), }); let block_producer_service = store::block_producer_server::BlockProducerServer::new(api::StoreApi { - state: Arc::clone(state), + state: Arc::clone(&state), chain_tip_sender, }); let reflection_service = tonic_reflection::server::Builder::configure() @@ -203,12 +203,11 @@ impl Store { let mut join_set = JoinSet::new(); - let state_clone = Arc::clone(state); join_set.spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); loop { interval.tick().await; - let _ = state_clone.analyze_table_sizes().await; + let _ = state.analyze_table_sizes().await; } }); From a03aad501d5e1de5923acdab7b0d7ba33087743b Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 30 Mar 2026 16:25:23 +1300 Subject: [PATCH 05/10] reinstate comment --- crates/store/src/server/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 900272d50..ba7ce5c99 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -204,7 +204,11 @@ impl Store { let mut join_set = JoinSet::new(); join_set.spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(5 * 60)); + // Manual tests on testnet indicate each iteration takes ~2s once things are OS cached. + // + // 5 minutes seems like a reasonable interval, where this should have minimal database + // IO impact while providing a decent view into table growth over time. + let mut interval = tokio::time::interval(Duration::from_mins(5)); loop { interval.tick().await; let _ = state.analyze_table_sizes().await; From 88bd62a8714ba366cbb26537448298fba27621bf Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 31 Mar 2026 10:23:21 +1300 Subject: [PATCH 06/10] Fix arc usage --- crates/store/src/server/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index ba7ce5c99..a41e34d6d 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -99,7 +99,6 @@ impl Store { State::load(&self.data_directory, self.storage_options, termination_ask) .await .context("failed to load state")?; - let state = Arc::new(state); let (proof_scheduler_task, chain_tip_sender) = Self::spawn_proof_scheduler( &state, @@ -141,7 +140,7 @@ impl Store { /// Returns the scheduler task handle and the chain tip sender (needed by gRPC services to /// notify the scheduler of new blocks). async fn spawn_proof_scheduler( - state: &Arc, + state: &State, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, proven_tip_sender: watch::Sender, @@ -172,13 +171,14 @@ impl Store { /// Spawns the gRPC servers and the DB maintenance background task. fn spawn_grpc_servers( - state: Arc, + state: State, chain_tip_sender: watch::Sender, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ntx_builder_listener: TcpListener, block_producer_listener: TcpListener, ) -> anyhow::Result>> { + let state = Arc::new(state); let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state), chain_tip_sender: chain_tip_sender.clone(), From afbebc75fa74fb4a9637b32c3cd93de988553f66 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 31 Mar 2026 10:25:09 +1300 Subject: [PATCH 07/10] move tip sender --- crates/store/src/server/proof_scheduler.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index 3c2748ba2..ba26704d4 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -65,9 +65,11 @@ impl ProofTaskJoinSet { let db = Arc::clone(db); let block_prover = Arc::clone(block_prover); let block_store = Arc::clone(block_store); - let proven_tip_sender = proven_tip_sender.clone(); - self.0.spawn(async move { - prove_block(&db, &block_prover, &block_store, &proven_tip_sender, block_num).await + self.0.spawn({ + let proven_tip_sender = proven_tip_sender.clone(); + async move { + prove_block(&db, &block_prover, &block_store, &proven_tip_sender, block_num).await + } }); } From 4ba9b1eeda337252dae5e7b450af5e9eeed40a63 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 31 Mar 2026 10:33:44 +1300 Subject: [PATCH 08/10] rename sender recvr vars --- crates/store/src/server/mod.rs | 12 ++++++------ crates/store/src/server/proof_scheduler.rs | 18 +++++++++--------- crates/store/src/state/mod.rs | 10 +++++----- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index a41e34d6d..e93b0f4b8 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -95,7 +95,7 @@ impl Store { let (termination_ask, mut termination_signal) = tokio::sync::mpsc::channel::(1); - let (state, proven_tip_sender) = + let (state, tx_proven_tip) = State::load(&self.data_directory, self.storage_options, termination_ask) .await .context("failed to load state")?; @@ -104,7 +104,7 @@ impl Store { &state, self.block_prover_url, self.max_concurrent_proofs, - proven_tip_sender, + tx_proven_tip, ) .await; @@ -143,7 +143,7 @@ impl Store { state: &State, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, - proven_tip_sender: watch::Sender, + proven_tip_tx: watch::Sender, ) -> ( tokio::task::JoinHandle>, watch::Sender, @@ -155,18 +155,18 @@ impl Store { }; let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; - let (chain_tip_sender, chain_tip_rx) = watch::channel(chain_tip); + let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip); let handle = proof_scheduler::spawn( state.db().clone(), block_prover, state.block_store(), chain_tip_rx, - proven_tip_sender, + proven_tip_tx, max_concurrent_proofs, ); - (handle, chain_tip_sender) + (handle, chain_tip_tx) } /// Spawns the gRPC servers and the DB maintenance background task. diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index ba26704d4..b7e4d6258 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -59,16 +59,16 @@ impl ProofTaskJoinSet { db: &Arc, block_prover: &Arc, block_store: &Arc, - proven_tip_sender: &watch::Sender, + proven_tip_tx: &watch::Sender, block_num: BlockNumber, ) { let db = Arc::clone(db); let block_prover = Arc::clone(block_prover); let block_store = Arc::clone(block_store); self.0.spawn({ - let proven_tip_sender = proven_tip_sender.clone(); + let proven_tip_tx = proven_tip_tx.clone(); async move { - prove_block(&db, &block_prover, &block_store, &proven_tip_sender, block_num).await + prove_block(&db, &block_prover, &block_store, &proven_tip_tx, block_num).await } }); } @@ -103,7 +103,7 @@ pub fn spawn( block_prover: Arc, block_store: Arc, chain_tip_rx: watch::Receiver, - proven_tip_sender: watch::Sender, + proven_tip_tx: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> JoinHandle> { tokio::spawn(run( @@ -111,7 +111,7 @@ pub fn spawn( block_prover, block_store, chain_tip_rx, - proven_tip_sender, + proven_tip_tx, max_concurrent_proofs, )) } @@ -130,7 +130,7 @@ async fn run( block_prover: Arc, block_store: Arc, mut chain_tip_rx: watch::Receiver, - proven_tip_sender: watch::Sender, + proven_tip_tx: watch::Sender, max_concurrent_proofs: NonZeroUsize, ) -> anyhow::Result<()> { info!(target: COMPONENT, "Proof scheduler started"); @@ -154,7 +154,7 @@ async fn run( } for block_num in unproven { - join_set.spawn(&db, &block_prover, &block_store, &proven_tip_sender, block_num); + join_set.spawn(&db, &block_prover, &block_store, &proven_tip_tx, block_num); } } @@ -185,7 +185,7 @@ async fn prove_block( db: &Db, block_prover: &BlockProver, block_store: &BlockStore, - proven_tip_sender: &watch::Sender, + proven_tip_tx: &watch::Sender, block_num: BlockNumber, ) -> anyhow::Result<()> { const MAX_RETRIES: u32 = 10; @@ -204,7 +204,7 @@ async fn prove_block( // Mark the block as proven and advance the sequence in the database. let advanced_in_sequence = db.mark_proven_and_advance_sequence(block_num).await?; if let Some(&last) = advanced_in_sequence.last() { - proven_tip_sender.send(last)?; + proven_tip_tx.send(last)?; info!( target = COMPONENT, block.number = %block_num, diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index a2f1d5e3e..ac515bebf 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -139,7 +139,7 @@ pub struct State { termination_ask: tokio::sync::mpsc::Sender, /// The latest proven-in-sequence block number, updated by the proof scheduler. - proven_tip: watch::Receiver, + proven_tip_rx: watch::Receiver, } impl State { @@ -203,7 +203,7 @@ impl State { .select_latest_proven_in_sequence_block_num() .await .map_err(StateInitializationError::DatabaseError)?; - let (proven_tip_sender, proven_tip) = watch::channel(proven_tip); + let (proven_tip_tx, proven_tip_rx) = watch::channel(proven_tip); Ok(( Self { @@ -213,9 +213,9 @@ impl State { forest, writer, termination_ask, - proven_tip, + proven_tip_rx, }, - proven_tip_sender, + proven_tip_tx, )) } @@ -876,7 +876,7 @@ impl State { pub async fn chain_tip(&self, finality: Finality) -> BlockNumber { match finality { Finality::Committed => self.inner.read().await.latest_block_num(), - Finality::Proven => *self.proven_tip.borrow(), + Finality::Proven => *self.proven_tip_rx.borrow(), } } From f3714421856923a9d6a164d80faae9c690c851e7 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 31 Mar 2026 10:35:09 +1300 Subject: [PATCH 09/10] rename to select_proven_tip --- crates/store/src/db/mod.rs | 2 +- crates/store/src/server/proof_scheduler.rs | 2 +- crates/store/src/state/mod.rs | 6 ++---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 51e487cab..5b459f18d 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -606,7 +606,7 @@ impl Db { /// /// This includes the genesis block, which is not technically proven, but treated as such. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] - pub async fn select_latest_proven_in_sequence_block_num(&self) -> Result { + pub async fn select_proven_tip(&self) -> Result { self.transact("select latest proven block num", |conn| { models::queries::select_latest_proven_in_sequence_block_num(conn) }) diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index b7e4d6258..d2b60e11d 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -141,7 +141,7 @@ async fn run( // Highest block number that is in-flight or has been proven. Used to avoid re-querying // blocks we've already scheduled. Initialized from the in-sequence tip so we skip // already-proven blocks on restart. - let mut highest_scheduled = db.select_latest_proven_in_sequence_block_num().await?; + let mut highest_scheduled = db.select_proven_tip().await?; loop { // Query the DB for unproven blocks beyond what we've already scheduled. diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index ac515bebf..1d9142d31 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -199,10 +199,8 @@ impl State { let db = Arc::new(db); // Initialize the proven tip from database. - let proven_tip = db - .select_latest_proven_in_sequence_block_num() - .await - .map_err(StateInitializationError::DatabaseError)?; + let proven_tip = + db.select_proven_tip().await.map_err(StateInitializationError::DatabaseError)?; let (proven_tip_tx, proven_tip_rx) = watch::channel(proven_tip); Ok(( From c45b0dc36dd647f787158d32d5ec084a43cbee57 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 31 Mar 2026 10:35:26 +1300 Subject: [PATCH 10/10] rename to proven_chain_tip --- crates/store/src/db/mod.rs | 2 +- crates/store/src/server/proof_scheduler.rs | 2 +- crates/store/src/state/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 5b459f18d..995aaca10 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -606,7 +606,7 @@ impl Db { /// /// This includes the genesis block, which is not technically proven, but treated as such. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] - pub async fn select_proven_tip(&self) -> Result { + pub async fn proven_chain_tip(&self) -> Result { self.transact("select latest proven block num", |conn| { models::queries::select_latest_proven_in_sequence_block_num(conn) }) diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index d2b60e11d..ba3275584 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -141,7 +141,7 @@ async fn run( // Highest block number that is in-flight or has been proven. Used to avoid re-querying // blocks we've already scheduled. Initialized from the in-sequence tip so we skip // already-proven blocks on restart. - let mut highest_scheduled = db.select_proven_tip().await?; + let mut highest_scheduled = db.proven_chain_tip().await?; loop { // Query the DB for unproven blocks beyond what we've already scheduled. diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index 1d9142d31..38df4b14a 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -200,7 +200,7 @@ impl State { // Initialize the proven tip from database. let proven_tip = - db.select_proven_tip().await.map_err(StateInitializationError::DatabaseError)?; + db.proven_chain_tip().await.map_err(StateInitializationError::DatabaseError)?; let (proven_tip_tx, proven_tip_rx) = watch::channel(proven_tip); Ok((