diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 0a669d2854..85acb62bff 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -128,8 +128,7 @@ impl BlockBuilder { #[instrument(target = COMPONENT, name = "block_builder.select_block", skip_all)] async fn select_block(mempool: &SharedMempool) -> SelectedBlock { - let (block_number, batches) = mempool.lock().await.select_block(); - SelectedBlock { block_number, batches } + mempool.lock().await.select_block() } /// Fetches block inputs from the store for the [`SelectedBlock`]. @@ -280,9 +279,10 @@ impl BlockBuilder { /// A wrapper around batches selected for inlucion in a block, primarily used to be able to inject /// telemetry in-between the selection and fetching the required [`BlockInputs`]. -struct SelectedBlock { - block_number: BlockNumber, - batches: Vec>, +#[derive(Clone, Debug, PartialEq)] +pub struct SelectedBlock { + pub block_number: BlockNumber, + pub batches: Vec>, } impl TelemetryInjectorExt for SelectedBlock { diff --git a/crates/block-producer/src/domain/batch.rs b/crates/block-producer/src/domain/batch.rs index 592a340434..cd6ae0e8b3 100644 --- a/crates/block-producer/src/domain/batch.rs +++ b/crates/block-producer/src/domain/batch.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::batch::BatchId; -use miden_protocol::transaction::TransactionId; +use miden_protocol::block::BlockNumber; use crate::domain::transaction::AuthenticatedTransaction; @@ -22,7 +22,7 @@ use crate::domain::transaction::AuthenticatedTransaction; pub(crate) struct SelectedBatch { txs: Vec>, id: BatchId, - account_updates: HashMap, + account_updates: HashMap)>, } impl SelectedBatch { @@ -43,12 +43,25 @@ impl SelectedBatch { } /// The aggregated list of account transitions this batch causes given as tuples of `(AccountId, - /// initial commitment, final commitment)`. + /// initial commitment, final commitment, Option)`. /// /// Note that the updates are aggregated, i.e. only a single update per account is possible, and /// transaction updates to an account of `a -> b -> c` will result in a single `a -> c`. - pub(crate) fn account_updates(&self) -> impl Iterator { - self.account_updates.iter().map(|(account, (from, to))| (*account, *from, *to)) + pub(crate) fn account_updates( + &self, + ) -> impl Iterator)> { + self.account_updates + .iter() + .map(|(account, (from, to, store))| (*account, *from, *to, *store)) + } + + pub(crate) fn expires_at(&self) -> BlockNumber { + self.txs + .iter() + .map(|tx| tx.expires_at().as_u32()) + .min() + .unwrap_or(u32::MAX) + .into() } } @@ -56,7 +69,7 @@ impl SelectedBatch { #[derive(Clone, Default)] pub(crate) struct SelectedBatchBuilder { pub(crate) txs: Vec>, - pub(crate) account_updates: HashMap, + pub(crate) account_updates: HashMap)>, } impl SelectedBatchBuilder { @@ -71,7 +84,7 @@ impl SelectedBatchBuilder { let update = tx.account_update(); self.account_updates .entry(update.account_id()) - .and_modify(|(_, to)| { + .and_modify(|(_from, to, _store)| { assert!( to == &update.initial_state_commitment(), "Cannot select transaction {} as its initial commitment {} for account {} does \ @@ -84,16 +97,15 @@ not match the current commitment {}", *to = update.final_state_commitment(); }) - .or_insert((update.initial_state_commitment(), update.final_state_commitment())); + .or_insert(( + update.initial_state_commitment(), + update.final_state_commitment(), + tx.store_account_state(), + )); self.txs.push(tx); } - /// Returns `true` if the batch contains the given transaction already. - pub(crate) fn contains(&self, target: &TransactionId) -> bool { - self.txs.iter().any(|tx| &tx.id() == target) - } - /// Returns `true` if it contains no transactions. pub(crate) fn is_empty(&self) -> bool { self.txs.is_empty() diff --git a/crates/block-producer/src/domain/transaction.rs b/crates/block-producer/src/domain/transaction.rs index 825497f2ed..c67fb64291 100644 --- a/crates/block-producer/src/domain/transaction.rs +++ b/crates/block-producer/src/domain/transaction.rs @@ -7,7 +7,7 @@ use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteHeader, Nullifier}; use miden_protocol::transaction::{OutputNote, ProvenTransaction, TransactionId, TxAccountUpdate}; -use crate::errors::VerifyTxError; +use crate::errors::StateConflict; use crate::store::TransactionInputs; /// A transaction who's proof has been verified, and which has been authenticated against the store. @@ -48,13 +48,13 @@ impl AuthenticatedTransaction { pub fn new_unchecked( tx: ProvenTransaction, inputs: TransactionInputs, - ) -> Result { + ) -> Result { let nullifiers_already_spent = tx .nullifiers() .filter(|nullifier| inputs.nullifiers.get(nullifier).copied().flatten().is_some()) .collect::>(); if !nullifiers_already_spent.is_empty() { - return Err(VerifyTxError::InputNotesAlreadyConsumed(nullifiers_already_spent)); + return Err(StateConflict::NullifiersAlreadyExist(nullifiers_already_spent)); } Ok(AuthenticatedTransaction { diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index daa798d7f6..2c4fc3745a 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -35,77 +35,14 @@ pub enum BlockProducerError { }, } -// Transaction verification errors -// ================================================================================================= - -#[derive(Debug, Error)] -pub enum VerifyTxError { - /// Another transaction already consumed the notes with given nullifiers - #[error( - "input notes with given nullifiers were already consumed by another transaction: {0:?}" - )] - InputNotesAlreadyConsumed(Vec), - - /// Unauthenticated transaction notes were not found in the store or in outputs of in-flight - /// transactions - #[error( - "unauthenticated transaction note commitments were not found in the store or in outputs of in-flight transactions: {0:?}" - )] - UnauthenticatedNotesNotFound(Vec), - - #[error("output note commitments already used: {0:?}")] - OutputNotesAlreadyExist(Vec), - - /// The account's initial commitment did not match the current account's commitment - #[error( - "transaction's initial state commitment {tx_initial_account_commitment} does not match the account's current value of {current_account_commitment}" - )] - IncorrectAccountInitialCommitment { - tx_initial_account_commitment: Word, - current_account_commitment: Word, - }, - - /// Failed to retrieve transaction inputs from the store - /// - /// TODO: Make this an "internal error". Q: Should we have a single `InternalError` enum for - /// all internal errors that can occur across the system? - #[error("failed to retrieve transaction inputs from the store")] - StoreConnectionFailed(#[from] StoreError), - - /// Failed to verify the transaction execution proof - #[error("invalid transaction proof error for transaction: {0}")] - InvalidTransactionProof(TransactionId), -} - // Transaction adding errors // ================================================================================================= #[derive(Debug, Error, GrpcError)] pub enum AddTransactionError { - #[error( - "input notes with given nullifiers were already consumed by another transaction: {0:?}" - )] - InputNotesAlreadyConsumed(Vec), - - #[error( - "unauthenticated transaction note commitments were not found in the store or in outputs of in-flight transactions: {0:?}" - )] - UnauthenticatedNotesNotFound(Vec), - - #[error("output note commitments already used: {0:?}")] - OutputNotesAlreadyExist(Vec), - - #[error( - "transaction's initial state commitment {tx_initial_account_commitment} does not match the account's current value of {current_account_commitment}" - )] - IncorrectAccountInitialCommitment { - tx_initial_account_commitment: Word, - current_account_commitment: Word, - }, - #[error("failed to retrieve transaction inputs from the store")] #[grpc(internal)] - StoreConnectionFailed(#[from] StoreError), + StoreConnectionFailed(#[source] StoreError), #[error("invalid transaction proof error for transaction: {0}")] InvalidTransactionProof(TransactionId), @@ -130,35 +67,31 @@ pub enum AddTransactionError { limit: BlockNumber, }, + #[error("transaction conflicts with current mempool state")] + StateConflict(#[source] StateConflict), + #[error("the mempool is at capacity")] CapacityExceeded, } -impl From for AddTransactionError { - fn from(err: VerifyTxError) -> Self { - match err { - VerifyTxError::InputNotesAlreadyConsumed(nullifiers) => { - Self::InputNotesAlreadyConsumed(nullifiers) - }, - VerifyTxError::UnauthenticatedNotesNotFound(note_commitments) => { - Self::UnauthenticatedNotesNotFound(note_commitments) - }, - VerifyTxError::OutputNotesAlreadyExist(note_commitments) => { - Self::OutputNotesAlreadyExist(note_commitments) - }, - VerifyTxError::IncorrectAccountInitialCommitment { - tx_initial_account_commitment, - current_account_commitment, - } => Self::IncorrectAccountInitialCommitment { - tx_initial_account_commitment, - current_account_commitment, - }, - VerifyTxError::StoreConnectionFailed(store_err) => { - Self::StoreConnectionFailed(store_err) - }, - VerifyTxError::InvalidTransactionProof(tx_id) => Self::InvalidTransactionProof(tx_id), - } - } +// Submitted transaction conflicts with current state +// ================================================================================================= +#[derive(Debug, Error, PartialEq, Eq)] +pub enum StateConflict { + #[error("nullifiers already exist: {0:?}")] + NullifiersAlreadyExist(Vec), + #[error("output notes already exist: {0:?}")] + OutputNotesAlreadyExist(Vec), + #[error("unauthenticated input notes are unknown: {0:?}")] + UnauthenticatedNotesMissing(Vec), + #[error( + "initial account commitment {expected} does not match the current commitment {current} for account {account}" + )] + AccountCommitmentMismatch { + account: AccountId, + expected: Word, + current: Word, + }, } // Submit proven batch by user errors diff --git a/crates/block-producer/src/mempool/budget.rs b/crates/block-producer/src/mempool/budget.rs index 0a3669ae12..a4c9c2167d 100644 --- a/crates/block-producer/src/mempool/budget.rs +++ b/crates/block-producer/src/mempool/budget.rs @@ -57,7 +57,7 @@ impl BatchBudget { /// Attempts to consume the transaction's resources from the budget. /// /// Returns [`BudgetStatus::Exceeded`] if the transaction would exceed the remaining budget, - /// otherwise returns [`BudgetStatus::Ok`] and subtracts the resources from the budget. + /// otherwise returns [`BudgetStatus::WithinScope`] and subtracts the resources from the budget. #[must_use] pub(crate) fn check_then_subtract(&mut self, tx: &AuthenticatedTransaction) -> BudgetStatus { // This type assertion reminds us to update the account check if we ever support @@ -89,7 +89,7 @@ impl BlockBudget { /// Attempts to consume the batch's resources from the budget. /// /// Returns [`BudgetStatus::Exceeded`] if the batch would exceed the remaining budget, - /// otherwise returns [`BudgetStatus::Ok`]. + /// otherwise returns [`BudgetStatus::WithinScope`]. #[must_use] pub(crate) fn check_then_subtract(&mut self, _batch: &ProvenBatch) -> BudgetStatus { if self.batches == 0 { diff --git a/crates/block-producer/src/mempool/graph/batch.rs b/crates/block-producer/src/mempool/graph/batch.rs new file mode 100644 index 0000000000..678ce05902 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/batch.rs @@ -0,0 +1,168 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use miden_protocol::Word; +use miden_protocol::account::AccountId; +use miden_protocol::batch::{BatchId, ProvenBatch}; +use miden_protocol::block::BlockNumber; +use miden_protocol::note::Nullifier; + +use crate::domain::batch::SelectedBatch; +use crate::errors::StateConflict; +use crate::mempool::BlockBudget; +use crate::mempool::budget::BudgetStatus; +use crate::mempool::graph::dag::Graph; +use crate::mempool::graph::node::GraphNode; + +// BATCH IMPL FOR GRAPH NODE +// ================================================================================================ + +impl GraphNode for SelectedBatch { + type Id = BatchId; + + fn nullifiers(&self) -> Box + '_> { + Box::new(self.txs().iter().flat_map(|tx| tx.nullifiers())) + } + + fn output_notes(&self) -> Box + '_> { + Box::new(self.txs().iter().flat_map(|tx| tx.output_note_commitments())) + } + + fn unauthenticated_notes(&self) -> Box + '_> { + Box::new(self.txs().iter().flat_map(|tx| tx.unauthenticated_note_commitments())) + } + + fn account_updates( + &self, + ) -> Box)> + '_> { + Box::new(self.account_updates()) + } + + fn id(&self) -> Self::Id { + self.id() + } + + fn expires_at(&self) -> BlockNumber { + self.expires_at() + } +} + +// BATCH GRAPH +// ================================================================================================ + +/// Tracks [`SelectedBatch`] instances that are pending proof generation. +/// +/// Batches form nodes in the underlying [`Graph`]. Edges between batches capture dependencies +/// introduced by shared resources (nullifiers, notes, and account states). The graph remains a DAG +/// by requiring that each batch builds on top of the state created by previously inserted batches. +#[derive(Clone, Debug, PartialEq, Default)] +pub struct BatchGraph { + inner: Graph, + proven: HashMap>, +} + +impl BatchGraph { + /// Inserts the batch into the dependency graph. + /// + /// # Errors + /// + /// Returns an error if the batch's state conflicts with the current graph view (e.g. it + /// consumes a nullifier that was already spent). + pub fn append(&mut self, batch: SelectedBatch) -> Result<(), StateConflict> { + self.inner.append(batch) + } + + /// Reverts the given batch and _all_ its descendants _IFF_ it is present in the graph. + /// + /// This includes batches that have been marked as proven. + /// + /// Returns the reverted batches in the _reverse_ chronological order they were appended in. + pub fn revert_batch_and_descendants(&mut self, batch: BatchId) -> Vec { + // We need this check because `inner.revert..` panics if the node is unknown. + if !self.inner.contains(&batch) { + return Vec::default(); + } + + let reverted = self.inner.revert_node_and_descendants(batch); + for batch in &reverted { + self.proven.remove(&batch.id()); + } + + reverted + } + + /// Reverts expired batches and their descendants. + /// + /// Only unselected batches are considered, the assumption being that selected batches + /// are in committed blocks and should not be reverted. + /// + /// Batches are returned in reverse-chronological order. + pub fn revert_expired(&mut self, chain_tip: BlockNumber) -> Vec { + let reverted = self.inner.revert_expired_unselected(chain_tip); + + for batch in &reverted { + self.proven.remove(&batch.id()); + } + + reverted + } + + /// Marks the given batch as proven, making it available for selection in a block + /// once it becomes a root. + pub fn submit_proof(&mut self, proof: Arc) { + if self.inner.contains(&proof.id()) { + self.proven.insert(proof.id(), proof); + } + } + + /// Returns `true` if the batch has been proven previously. + pub fn is_proven(&mut self, batch: &BatchId) -> bool { + self.proven.contains_key(batch) + } + + /// Selects a set of batches for inclusion in the next block. + /// + /// A batch is available for selection if: + /// - all the batches it depends on have been selected for a previous block, or are selected in + /// this block as well, and + /// - the batch has had a proof submitted + pub fn select_block(&mut self, mut budget: BlockBudget) -> Vec> { + let mut selected = Vec::default(); + + // Only batches which are proven can be selected for inclusion in a block. + while let Some(candidate) = + self.inner.selection_candidates().iter().find_map(|(id, _)| self.proven.get(id)) + { + if budget.check_then_subtract(candidate) == BudgetStatus::Exceeded { + break; + } + + self.inner.select_candidate(candidate.id()); + selected.push(Arc::clone(candidate)); + } + + selected + } + + /// Prunes the given batch. + /// + /// # Panics + /// + /// Panics if the batch does not exist, or has existing ancestors in the batch + /// graph. + pub fn prune(&mut self, batch: BatchId) { + self.inner.prune(batch); + self.proven.remove(&batch); + } + + pub fn proven_count(&self) -> usize { + self.proven.len() + } + + pub fn proposed_count(&self) -> usize { + self.inner + .node_count() + .checked_sub(self.proven_count()) + .expect("proven batches cannot exceed total batches") + } +} diff --git a/crates/block-producer/src/mempool/graph/dag.rs b/crates/block-producer/src/mempool/graph/dag.rs new file mode 100644 index 0000000000..2e10ef05a5 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/dag.rs @@ -0,0 +1,360 @@ +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::hash::Hash; + +use miden_protocol::block::BlockNumber; + +use crate::mempool::StateConflict; +use crate::mempool::graph::edges::Edges; +use crate::mempool::graph::node::GraphNode; +use crate::mempool::graph::state::State; + +// GRAPH DAG +// ================================================================================================ + +#[derive(Clone, Debug, PartialEq)] +pub struct Graph +where + N: GraphNode, + N::Id: Eq + Hash + Copy, +{ + /// All nodes present in the graph. + nodes: HashMap, + /// The aggregate state of all nodes in the graph. + state: State, + /// The relation between the nodes formed by their dependencies on each others state. + edges: Edges, + /// Nodes that have been selected. + selected: HashSet, + /// Nodes that are available for selection. + /// + /// These are nodes whose parents have all been selected. + selection_candidates: BTreeSet, +} + +impl Default for Graph +where + N: GraphNode, + N::Id: Eq + Hash + Copy, +{ + fn default() -> Self { + Self { + nodes: HashMap::default(), + edges: Edges::default(), + selected: HashSet::default(), + selection_candidates: BTreeSet::default(), + state: State::default(), + } + } +} + +impl Graph +where + N: GraphNode, + N::Id: Eq + Hash + Copy + std::fmt::Display + Ord, +{ + /// Appends a node to the graph. + /// + /// Parent-child edges are inferred from state dependencies: + /// - A note parent edge exists when this node consumes an unauthenticated note that was created + /// by the parent node. + /// - An account parent edge exists when this node's account update begins from the commitment + /// that the parent node transitioned the account to. + /// + /// # Errors + /// + /// Returns an error if the node's state does not build on top of the current graph state. + pub fn append(&mut self, node: N) -> Result<(), StateConflict> { + self.state.validate_append(&node)?; + + let id = node.id(); + let parents = self.state.apply_append(id, &node); + self.edges.insert(id, parents); + self.nodes.insert(id, node); + self.selection_check(id); + + Ok(()) + } + + /// Returns the set of nodes which can be selected. + /// + /// Candidates are nodes that are not currently selected, have all parents selected, and can be + /// handed directly to [`select_candidate`](Self::select_candidate). + pub fn selection_candidates(&self) -> BTreeMap<&N::Id, &N> { + self.selection_candidates + .iter() + .map(|id| (id, self.nodes.get(id).expect("selection_candidates is a subset of nodes"))) + .collect() + } + + /// Returns `true` if the given node was previously selected. + pub fn is_selected(&self, node: &N::Id) -> bool { + self.selected.contains(node) + } + + /// Marks the node as a selection candidate if all its parents are already selected. + fn selection_check(&mut self, id: N::Id) { + let parents = self.edges.parents_of(&id); + if parents.iter().all(|parent| self.is_selected(parent)) { + self.selection_candidates.insert(id); + } + } + + /// Marks the given node as unselected. + /// + /// # Panics + /// + /// Panics if the node was not previously selected or if any of its children are marked as + /// selected. + pub fn deselect(&mut self, node: N::Id) { + assert!( + self.is_selected(&node), + "Cannot deselect node {node} which is not in selected set" + ); + + let children = self.edges.children_of(&node); + assert!( + children.iter().all(|child| !self.is_selected(child)), + "Cannot deselect node {node} which still has children selected", + ); + + self.selected.remove(&node); + // This makes the node a selection candidate by definition, and all its children should be + // removed as candidates. + self.selection_candidates.insert(node); + for child in self.edges.children_of(&node) { + self.selection_candidates.remove(child); + } + } + + /// Marks a node as selected. + /// + /// # Panics + /// + /// Panics if the given node is not a selection candidate. + pub fn select_candidate(&mut self, node: N::Id) { + assert!(!self.selected.contains(&node)); + assert!(self.edges.parents_of(&node).iter().all(|parent| self.selected.contains(parent))); + + self.selected.insert(node); + self.selection_candidates.remove(&node); + + // Its children are now potential new candidates. + let children = self.edges.children_of(&node).clone(); + for child in children { + self.selection_check(child); + } + } + + /// Returns the node and its descendants. + /// + /// That is, this returns the node's children, their children etc. + fn descendants(&self, node: &N::Id) -> HashSet { + let mut to_process = vec![*node]; + let mut descendants = HashSet::default(); + + while let Some(node) = to_process.pop() { + // Don't double process. + if descendants.contains(&node) { + continue; + } + let children = self.edges.children_of(&node); + to_process.extend(children); + descendants.insert(node); + } + + descendants + } + + /// Reverts the given node and all of its descendants, returning the reverted nodes. + /// + /// Nodes are reverted from leaves (nodes without children) backwards, and are returned in + /// that order. This is sort of a reverse chronological order i.e. this could be + /// reversed and re-inserted without error. + /// + /// # Panics + /// + /// Panics if the node does not exist or if the graph invariants (such as acyclicity) are + /// violated while unwinding descendants. The latter indicates graph corruption. + pub fn revert_node_and_descendants(&mut self, id: N::Id) -> Vec { + let mut descendants = self.descendants(&id); + + // This implementation is O(n^2) and could be improved by tracking the chronological order + // in which nodes are appended to the graph. This would let us revert in + // reverse-chronological order which _must_ succeed by definition. + // + // However that is quite a bit more code, and won't be worth doing for quite some time. + let mut reverted = Vec::new(); + 'outer: while !descendants.is_empty() { + for id in descendants.iter().copied() { + if self.is_leaf(&id) { + reverted.push(self.remove(id)); + descendants.remove(&id); + continue 'outer; + } + } + + panic!("failed to make progress"); + } + + reverted + } + + /// Reverts nodes (and their descendants) which have expired and which are _not_ selected. + /// + /// Returns the reverted nodes in **reverse** chronological order. + pub fn revert_expired_unselected(&mut self, chain_tip: BlockNumber) -> Vec { + let mut reverted = Vec::default(); + + let expired = self + .nodes + .iter() + .filter(|(id, _)| !self.is_selected(id)) + .filter_map(|(id, node)| (node.expires_at() <= chain_tip).then_some(id)) + .copied() + .collect::>(); + + for id in expired { + // Its possible the node is already reverted by a previous loop iteration. + if self.contains(&id) { + reverted.extend(self.revert_node_and_descendants(id)); + } + } + + reverted + } + + /// Returns `true` if the given node is a leaf node aka has no children. + fn is_leaf(&self, id: &N::Id) -> bool { + self.edges.children_of(id).is_empty() + } + + /// Removes the node _IFF_ it has no ancestor nodes. + /// + /// # Panics + /// + /// Panics if this node has any ancestor nodes, or if this node was not selected. + pub fn prune(&mut self, id: N::Id) { + assert!( + self.edges.parents_of(&id).is_empty(), + "Cannot prune node {id} as it still has ancestors", + ); + assert!(self.selected.contains(&id), "Cannot prune node {id} as it was not selected"); + + self.remove(id); + } + + /// Unconditionally removes the given node from the graph, deleting its edges and state. + /// + /// This is an _internal_ helper, caller is responsible for ensuring that the graph won't be + /// corrupted by this removal. This is true if the node has no parents, or no children. + fn remove(&mut self, id: N::Id) -> N { + // Destructure so we are reminded to clean up new fields. + let Self { + nodes, + state, + edges, + selected, + selection_candidates, + } = self; + + let node = nodes.remove(&id).unwrap(); + state.remove(&node); + selected.remove(&id); + edges.remove(&id); + selection_candidates.remove(&id); + + node + } + + pub fn selected_count(&self) -> usize { + self.selected.len() + } + + pub fn node_count(&self) -> usize { + self.nodes.len() + } + + pub fn account_count(&self) -> usize { + self.state.account_count() + } + + pub fn nullifier_count(&self) -> usize { + self.state.nullifier_count() + } + + pub fn output_note_count(&self) -> usize { + self.state.output_note_count() + } + + pub fn contains(&self, node: &N::Id) -> bool { + self.nodes.contains_key(node) + } +} + +// GRAPH DAG TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use miden_protocol::block::BlockNumber; + + use super::*; + use crate::mempool::graph::node::test_node::TestNode; + + #[test] + fn child_becomes_candidate_after_parent_selection() { + let mut graph = Graph::::default(); + + graph + .append( + TestNode::new(1) + .with_output_notes([1]) + .with_expires_at(BlockNumber::from(10u32)), + ) + .unwrap(); + graph + .append( + TestNode::new(2) + .with_output_notes([2]) + .with_unauthenticated_notes([1]) + .with_expires_at(BlockNumber::from(10u32)), + ) + .unwrap(); + + let initial_candidates: Vec = + graph.selection_candidates().keys().map(|id| **id).collect(); + assert_eq!(initial_candidates, vec![1]); + + graph.select_candidate(1); + + let candidates_after_parent: Vec = + graph.selection_candidates().keys().map(|id| **id).collect(); + assert_eq!(candidates_after_parent, vec![2]); + } + + #[test] + fn revert_expired_unselected_removes_descendants() { + let mut graph = Graph::::default(); + + graph + .append( + TestNode::new(1).with_output_notes([1]).with_expires_at(BlockNumber::from(2u32)), + ) + .unwrap(); + graph + .append( + TestNode::new(2) + .with_output_notes([2]) + .with_unauthenticated_notes([1]) + .with_expires_at(BlockNumber::from(3u32)), + ) + .unwrap(); + + let reverted = graph.revert_expired_unselected(BlockNumber::from(3u32)); + let reverted_ids: Vec = reverted.into_iter().map(|node| node.id).collect(); + + assert_eq!(reverted_ids, vec![2, 1]); + assert_eq!(graph.node_count(), 0); + assert_eq!(graph.selection_candidates().len(), 0); + } +} diff --git a/crates/block-producer/src/mempool/graph/edges.rs b/crates/block-producer/src/mempool/graph/edges.rs new file mode 100644 index 0000000000..5fd8a7cfe9 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/edges.rs @@ -0,0 +1,140 @@ +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; + +// GRAPH EDGES +// ================================================================================================ + +/// Maintains parent and child relationships between nodes in the mempool graph. +/// +/// `Edges` tracks both sides of the relationship to support efficient queries for a node's +/// parents (dependencies) and children (dependants) while keeping the two maps in sync. +#[derive(Clone, Debug, PartialEq)] +pub struct Edges +where + Id: Eq + Hash + Copy, +{ + parents: HashMap>, + children: HashMap>, +} + +impl Default for Edges +where + Id: Eq + Hash + Copy, +{ + fn default() -> Self { + Self { + parents: HashMap::default(), + children: HashMap::default(), + } + } +} + +impl Edges +where + Id: Eq + Hash + Copy, +{ + /// Registers a newly appended node's edges in the graph. + /// + /// Since it is newly appended, the node itself will only have parents and no children. + /// The inverse child relationships are updated accordingly. + /// + /// # Panics + /// + /// Panics if the node is already tracked, or the parent nodes are not tracked. + pub fn insert(&mut self, node: Id, parents: HashSet) { + assert!(!self.children.contains_key(&node)); + + self.parents.insert(node, parents.clone()); + self.children.insert(node, HashSet::default()); + + for parent in parents { + self.children.get_mut(&parent).unwrap().insert(node); + } + } + + /// Returns the parents of `node`. + /// + /// # Panics + /// + /// Panics if the node is not tracked. + pub fn parents_of(&self, node: &Id) -> &HashSet { + self.parents.get(node).unwrap() + } + + /// Returns the children of `node`. + /// + /// # Panics + /// + /// Panics if the node is not tracked. + pub fn children_of(&self, node: &Id) -> &HashSet { + self.children.get(node).unwrap() + } + + /// Removes the node from the edge set, updating all inverse relationships. + /// + /// # Panics + /// + /// Panics if the node is not tracked. + pub fn remove(&mut self, node: &Id) { + let parents = self.parents.remove(node).expect("node must exist when removing from edges"); + + for parent in parents { + if let Some(children) = self.children.get_mut(&parent) { + children.remove(node); + } + } + + let children = + self.children.remove(node).expect("node must exist when removing from edges"); + + for child in children { + if let Some(parents) = self.parents.get_mut(&child) { + parents.remove(node); + } + } + } +} + +// GRAPH EDGES TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::Edges; + + #[test] + fn insert_adds_parent_child_relationships() { + let mut edges = Edges::::default(); + + edges.insert(1, HashSet::new()); + assert!(edges.parents_of(&1).is_empty()); + assert!(edges.children_of(&1).is_empty()); + + edges.insert(2, HashSet::from([1])); + + assert_eq!(edges.parents_of(&2), &HashSet::from([1])); + assert_eq!(edges.children_of(&1), &HashSet::from([2])); + assert!(edges.children_of(&2).is_empty()); + } + + #[test] + fn remove_updates_inverse_relationships() { + let mut edges = Edges::::default(); + + edges.insert(1, HashSet::new()); + edges.insert(2, HashSet::from([1])); + edges.insert(3, HashSet::from([2])); + + edges.remove(&2); + + assert!(edges.children_of(&1).is_empty()); + assert!(edges.parents_of(&3).is_empty()); + + edges.insert(4, HashSet::from([3])); + + assert_eq!(edges.parents_of(&4), &HashSet::from([3])); + assert_eq!(edges.children_of(&3), &HashSet::from([4])); + } +} diff --git a/crates/block-producer/src/mempool/graph/mod.rs b/crates/block-producer/src/mempool/graph/mod.rs new file mode 100644 index 0000000000..a624e3035a --- /dev/null +++ b/crates/block-producer/src/mempool/graph/mod.rs @@ -0,0 +1,9 @@ +mod batch; +mod dag; +mod edges; +mod node; +mod state; +mod transaction; + +pub use batch::BatchGraph; +pub use transaction::TransactionGraph; diff --git a/crates/block-producer/src/mempool/graph/node.rs b/crates/block-producer/src/mempool/graph/node.rs new file mode 100644 index 0000000000..ac036612c4 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/node.rs @@ -0,0 +1,154 @@ +use miden_protocol::Word; +use miden_protocol::account::AccountId; +use miden_protocol::block::BlockNumber; +use miden_protocol::note::Nullifier; + +// GRAPH NODE +// ================================================================================================ + +/// Defines a node in the mempool graph. +pub trait GraphNode { + type Id; + + fn id(&self) -> Self::Id; + + /// All [`Nullifier`]s created by this node, **including** nullifiers for erased notes. This + /// may not be strictly necessary but it removes having to worry about reverting batches and + /// blocks with erased notes -- since these would otherwise have different state impact than + /// the transactions within them. + fn nullifiers(&self) -> Box + '_>; + + /// All output notes created by this node, **including** erased notes. This may not + /// be strictly necessary but it removes having to worry about reverting batches and blocks + /// with erased notes -- since these would otherwise have different state impact than the + /// transactions within them. + fn output_notes(&self) -> Box + '_>; + + /// Input notes which were not authenticated against any committed block thus far. + /// + /// Such notes are not yet known to exist by us (in the store) and must therefore be the output + /// of another node currently in flight in the graph in order to be considered valid. + fn unauthenticated_notes(&self) -> Box + '_>; + + /// The account state updates caused by this node. + /// + /// Output tuple represents each updates `(account ID, initial commitment, final commitment, + /// store commitment)`. + /// + /// Updates must be aggregates i.e. only a single account ID update allowed. + fn account_updates( + &self, + ) -> Box)> + '_>; + + /// The block height at which this node is considered expired. + fn expires_at(&self) -> BlockNumber; +} + +// GRAPH NODE TESTS +// ================================================================================================ + +#[cfg(test)] +pub(crate) mod test_node { + use miden_protocol::Felt; + + use super::*; + + /// Lightweight [`GraphNode`] implementation for unit tests. + #[derive(Clone, Debug)] + pub struct TestNode { + pub id: u32, + pub nullifiers: Vec, + pub output_notes: Vec, + pub unauthenticated_notes: Vec, + pub account_updates: Vec<(AccountId, Word, Word, Option)>, + pub expires_at: BlockNumber, + } + + impl TestNode { + pub fn new(id: u32) -> Self { + Self { + id, + nullifiers: Vec::new(), + output_notes: Vec::new(), + unauthenticated_notes: Vec::new(), + account_updates: Vec::new(), + expires_at: BlockNumber::MAX, + } + } + + pub fn with_nullifiers(mut self, nullifiers: impl IntoIterator) -> Self { + self.nullifiers = nullifiers.into_iter().map(Self::to_nullifier).collect(); + self + } + + pub fn with_output_notes(mut self, notes: impl IntoIterator) -> Self { + self.output_notes = notes.into_iter().map(Self::to_word).collect(); + self + } + + pub fn with_unauthenticated_notes(mut self, notes: impl IntoIterator) -> Self { + self.unauthenticated_notes = notes.into_iter().map(Self::to_word).collect(); + self + } + + pub fn with_account_update(mut self, update: (AccountId, u32, u32, Option)) -> Self { + let (account, from, to, store) = update; + self.account_updates.push(( + account, + Self::to_word(from), + Self::to_word(to), + store.map(Self::to_word), + )); + self + } + + pub fn with_expires_at(mut self, expires_at: BlockNumber) -> Self { + self.expires_at = expires_at; + self + } + + fn to_word(value: u32) -> Word { + Word::from([Felt::from(value), Felt::ZERO, Felt::ZERO, Felt::ZERO]) + } + + fn to_nullifier(value: u32) -> Nullifier { + Nullifier::from_raw(Self::to_word(value)) + } + } + + impl Default for TestNode { + fn default() -> Self { + Self::new(0) + } + } + + impl GraphNode for TestNode { + type Id = u32; + + fn id(&self) -> Self::Id { + self.id + } + + fn nullifiers(&self) -> Box + '_> { + Box::new(self.nullifiers.iter().copied()) + } + + fn output_notes(&self) -> Box + '_> { + Box::new(self.output_notes.iter().copied()) + } + + fn unauthenticated_notes(&self) -> Box + '_> { + Box::new(self.unauthenticated_notes.iter().copied()) + } + + fn account_updates( + &self, + ) -> Box)> + '_> { + Box::new(self.account_updates.iter().copied()) + } + + fn expires_at(&self) -> BlockNumber { + self.expires_at + } + } +} diff --git a/crates/block-producer/src/mempool/graph/state.rs b/crates/block-producer/src/mempool/graph/state.rs new file mode 100644 index 0000000000..e690d10f43 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/state.rs @@ -0,0 +1,467 @@ +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::fmt::Display; +use std::hash::Hash; + +use miden_protocol::Word; +use miden_protocol::account::AccountId; +use miden_protocol::note::Nullifier; + +use crate::errors::StateConflict; +use crate::mempool::graph::node::GraphNode; + +/// Tracks the shared state of the mempool graph that is required to validate and apply nodes. +#[derive(Clone, Debug, PartialEq)] +pub(super) struct State +where + K: Eq + Hash + Copy, +{ + nullifiers: HashSet, + notes_created: HashMap, + accounts: HashMap>, +} + +impl Default for State +where + K: Eq + Hash + Copy, +{ + fn default() -> Self { + Self { + nullifiers: HashSet::default(), + notes_created: HashMap::default(), + accounts: HashMap::default(), + } + } +} + +impl State +where + K: Eq + Hash + Copy, +{ + /// Ensures that `node` can be appended on top of the current state without conflicts. + pub(super) fn validate_append(&self, node: &N) -> Result<(), StateConflict> + where + N: GraphNode, + { + let duplicate_nullifiers = node + .nullifiers() + .filter(|nullifier| self.nullifiers.contains(nullifier)) + .collect::>(); + if !duplicate_nullifiers.is_empty() { + return Err(StateConflict::NullifiersAlreadyExist(duplicate_nullifiers)); + } + + let duplicate_output_notes = node + .output_notes() + .filter(|note| self.notes_created.contains_key(note)) + .collect::>(); + if !duplicate_output_notes.is_empty() { + return Err(StateConflict::OutputNotesAlreadyExist(duplicate_output_notes)); + } + + let missing_input_notes = node + .unauthenticated_notes() + .filter(|note| !self.notes_created.contains_key(note)) + .collect::>(); + if !missing_input_notes.is_empty() { + return Err(StateConflict::UnauthenticatedNotesMissing(missing_input_notes)); + } + + for (account_id, from, _to, store) in node.account_updates() { + let current = self + .accounts + .get(&account_id) + .map(AccountStates::commitment) + .or(store) + .unwrap_or_default(); + + if from != current { + return Err(StateConflict::AccountCommitmentMismatch { + account: account_id, + expected: from, + current, + }); + } + } + + Ok(()) + } + + /// Applies `node` to the state, returning the set of parent node identifiers inferred from + /// state dependencies. + pub(super) fn apply_append(&mut self, node_id: K, node: &N) -> HashSet + where + N: GraphNode, + { + let mut parents = HashSet::new(); + + self.nullifiers.extend(node.nullifiers()); + self.notes_created.extend(node.output_notes().map(|note| (note, node_id))); + + parents.extend(node.unauthenticated_notes().map(|note| { + *self + .notes_created + .get(¬e) + .expect("unauthenticated note must exist in the state") + })); + + for (account_id, from, to, _store) in node.account_updates() { + let account = self.accounts.entry(account_id); + + if from == to { + account + .and_modify(|account| { + parents.extend(account.current_owner()); + account.insert_pass_through(node_id); + }) + .or_insert_with(|| AccountStates::with_pass_through(to, node_id)) + } else { + account + .and_modify(|account| { + parents.extend(account.current_owner()); + parents.extend(account.current_pass_through()); + account.append_state(to, node_id); + }) + .or_insert_with(|| AccountStates::with_owner(to, node_id)) + }; + } + + parents + } + + /// Removes all state associated with `node`, undoing the effects of [`Self::apply_append`]. + pub(super) fn remove(&mut self, node: &N) + where + N: GraphNode, + N::Id: Display, + { + let node_id = node.id(); + + for nullifier in node.nullifiers() { + self.nullifiers.remove(&nullifier); + } + + for note in node.output_notes() { + self.notes_created.remove(¬e); + } + + for (account_id, from, to, ..) in node.account_updates() { + let Entry::Occupied(mut account_entry) = self.accounts.entry(account_id) else { + panic!( + "Cannot remove account {account_id} entry for node {node_id} as it does not exist" + ); + }; + + account_entry.get_mut().remove_node(&node_id, from, to); + + if account_entry.get().is_empty() { + account_entry.remove(); + } + } + } + + pub fn account_count(&self) -> usize { + self.accounts.len() + } + + pub fn nullifier_count(&self) -> usize { + self.nullifiers.len() + } + + pub fn output_note_count(&self) -> usize { + self.notes_created.len() + } +} + +/// Tracks the per-account state transitions that are in-flight within the mempool graph. +#[derive(Clone, Debug, PartialEq)] +struct AccountStates +where + K: Eq + Hash + Copy, +{ + commitment: Word, + nodes: VecDeque>, +} + +impl AccountStates +where + K: Eq + Hash + Copy, +{ + fn with_owner(commitment: Word, owner: K) -> Self { + let nodes = CommitmentNodes::with_owner(owner); + let nodes = VecDeque::from([nodes]); + + Self { commitment, nodes } + } + + fn with_pass_through(commitment: Word, node: K) -> Self { + let nodes = CommitmentNodes::with_pass_through(node); + let nodes = VecDeque::from([nodes]); + + Self { commitment, nodes } + } + + fn append_state(&mut self, commitment: Word, owner: K) { + self.commitment = commitment; + self.nodes.push_back(CommitmentNodes::with_owner(owner)); + } + + fn remove_node(&mut self, node: &K, from: Word, to: Word) { + if to == self.commitment { + let nodes = self.nodes.back_mut().unwrap(); + nodes.remove(node); + + if nodes.is_empty() { + self.nodes.pop_back(); + self.commitment = from; + } + } else { + let nodes = self.nodes.front_mut().unwrap(); + nodes.remove(node); + + if nodes.is_empty() { + self.nodes.pop_front(); + } + } + } + + fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + fn current_owner(&self) -> Option { + self.current_nodes().owner + } + + fn current_pass_through(&self) -> impl Iterator + '_ { + self.current_nodes().pass_through.iter().copied() + } + + fn insert_pass_through(&mut self, node: K) { + self.nodes + .back_mut() + .expect("current commitment must exist") + .pass_through + .insert(node); + } + + fn commitment(&self) -> Word { + self.commitment + } + + fn current_nodes(&self) -> &CommitmentNodes { + self.nodes.back().expect("current commitment must exist") + } +} + +/// Associates node identifiers with a single account commitment. +#[derive(Clone, Debug, PartialEq)] +struct CommitmentNodes +where + K: Eq + Hash + Copy, +{ + owner: Option, + pass_through: HashSet, +} + +impl Default for CommitmentNodes +where + K: Eq + Hash + Copy, +{ + fn default() -> Self { + Self { + owner: None, + pass_through: HashSet::default(), + } + } +} + +impl CommitmentNodes +where + K: Eq + Hash + Copy, +{ + fn with_owner(owner: K) -> Self { + Self { + owner: Some(owner), + pass_through: HashSet::default(), + } + } + + fn with_pass_through(node: K) -> Self { + Self { + owner: None, + pass_through: HashSet::from([node]), + } + } + + fn remove(&mut self, node: &K) { + if self.owner.as_ref() == Some(node) { + self.owner = None; + } + self.pass_through.remove(node); + } + + fn is_empty(&self) -> bool { + self.owner.is_none() && self.pass_through.is_empty() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use miden_protocol::note::Nullifier; + use miden_protocol::{Felt, Word}; + + use super::*; + use crate::errors::StateConflict; + use crate::mempool::graph::node::test_node::TestNode; + use crate::test_utils::mock_account_id; + + fn word(value: u32) -> Word { + Word::from([Felt::from(value), Felt::ZERO, Felt::ZERO, Felt::ZERO]) + } + + fn nullifier(value: u32) -> Nullifier { + Nullifier::from_raw(word(value)) + } + + #[test] + fn validate_append_rejects_duplicate_nullifiers() { + let mut state = State::::default(); + let account_id = mock_account_id(1); + + let node_a = TestNode::new(1) + .with_nullifiers([1]) + .with_output_notes([11]) + .with_account_update((account_id, 0, 2, None)); + + state.validate_append(&node_a).unwrap(); + state.apply_append(node_a.id, &node_a); + + let node_b = TestNode::new(2) + .with_nullifiers([1]) + .with_output_notes([22]) + .with_unauthenticated_notes([11]) + .with_account_update((account_id, 2, 3, None)); + + match state.validate_append(&node_b) { + Err(StateConflict::NullifiersAlreadyExist(duplicates)) => { + assert_eq!(duplicates, vec![nullifier(1)]); + }, + other => panic!("expected duplicate nullifier error, found {other:?}"), + } + } + + #[test] + fn apply_append_registers_parents_and_counts() { + let mut state = State::::default(); + let account_id = mock_account_id(2); + + let node_a = TestNode::new(10) + .with_nullifiers([10]) + .with_output_notes([42]) + .with_account_update((account_id, 0, 5, None)); + + state.validate_append(&node_a).unwrap(); + state.apply_append(node_a.id, &node_a); + + let node_b = TestNode::new(11) + .with_output_notes([43]) + .with_unauthenticated_notes([42]) + .with_account_update((account_id, 5, 6, None)); + + state.validate_append(&node_b).unwrap(); + let parents = state.apply_append(node_b.id, &node_b); + + assert_eq!(parents, HashSet::from([node_a.id])); + assert_eq!(state.account_count(), 1); + assert_eq!(state.nullifier_count(), 1); + assert_eq!(state.output_note_count(), 2); + } + + #[test] + fn validate_append_rejects_duplicate_output_notes() { + let mut state = State::::default(); + let account_id = mock_account_id(4); + + let node_a = TestNode::new(30) + .with_output_notes([200]) + .with_account_update((account_id, 0, 5, None)); + state.validate_append(&node_a).unwrap(); + state.apply_append(node_a.id, &node_a); + + let node_b = TestNode::new(31) + .with_output_notes([200]) + .with_account_update((account_id, 5, 6, None)); + + match state.validate_append(&node_b) { + Err(StateConflict::OutputNotesAlreadyExist(duplicates)) => { + assert_eq!(duplicates, vec![word(200)]); + }, + other => panic!("expected duplicate output note error, found {other:?}"), + } + } + + #[test] + fn validate_append_rejects_unknown_unauthenticated_notes() { + let state = State::::default(); + let account_id = mock_account_id(5); + + let node = TestNode::new(40) + .with_unauthenticated_notes([300]) + .with_account_update((account_id, 0, 0, None)); + + match state.validate_append(&node) { + Err(StateConflict::UnauthenticatedNotesMissing(missing)) => { + assert_eq!(missing, vec![word(300)]); + }, + other => panic!("expected missing unauthenticated note error, found {other:?}"), + } + } + + #[test] + fn validate_append_rejects_account_commitment_mismatch() { + let state = State::::default(); + let account_id = mock_account_id(6); + + let node = TestNode::new(50).with_account_update((account_id, 400, 401, None)); + + match state.validate_append(&node) { + Err(StateConflict::AccountCommitmentMismatch { expected, current, .. }) => { + assert_eq!(expected, word(400)); + assert_eq!(current, Word::default()); + }, + other => panic!("expected account commitment mismatch error, found {other:?}"), + } + } + + #[test] + fn remove_cleans_up_account_state() { + let mut state = State::::default(); + let account_id = mock_account_id(3); + + let node_a = TestNode::new(21) + .with_nullifiers([21]) + .with_output_notes([100]) + .with_account_update((account_id, 0, 7, None)); + state.validate_append(&node_a).unwrap(); + state.apply_append(node_a.id, &node_a); + + let node_b = TestNode::new(22) + .with_output_notes([101]) + .with_account_update((account_id, 7, 8, None)); + state.validate_append(&node_b).unwrap(); + state.apply_append(node_b.id, &node_b); + + state.remove(&node_b); + assert_eq!(state.nullifier_count(), 1); + assert_eq!(state.output_note_count(), 1); + assert_eq!(state.account_count(), 1); + + state.remove(&node_a); + assert_eq!(state.nullifier_count(), 0); + assert_eq!(state.output_note_count(), 0); + assert_eq!(state.account_count(), 0); + } +} diff --git a/crates/block-producer/src/mempool/graph/transaction.rs b/crates/block-producer/src/mempool/graph/transaction.rs new file mode 100644 index 0000000000..6230f95fc3 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/transaction.rs @@ -0,0 +1,186 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use miden_protocol::Word; +use miden_protocol::account::AccountId; +use miden_protocol::block::BlockNumber; +use miden_protocol::note::Nullifier; +use miden_protocol::transaction::TransactionId; + +use crate::domain::batch::SelectedBatch; +use crate::domain::transaction::AuthenticatedTransaction; +use crate::errors::StateConflict; +use crate::mempool::BatchBudget; +use crate::mempool::budget::BudgetStatus; +use crate::mempool::graph::dag::Graph; +use crate::mempool::graph::node::GraphNode; + +// TRANSACTION GRAPH NODE +// ================================================================================================ + +impl GraphNode for Arc { + type Id = TransactionId; + + fn nullifiers(&self) -> Box + '_> { + Box::new(self.as_ref().nullifiers()) + } + + fn output_notes(&self) -> Box + '_> { + Box::new(self.output_note_commitments()) + } + + fn unauthenticated_notes(&self) -> Box + '_> { + Box::new(self.unauthenticated_note_commitments()) + } + + fn account_updates( + &self, + ) -> Box)> + '_> { + let update = self.account_update(); + Box::new(std::iter::once(( + update.account_id(), + update.initial_state_commitment(), + update.final_state_commitment(), + self.store_account_state(), + ))) + } + + fn id(&self) -> Self::Id { + self.as_ref().id() + } + + fn expires_at(&self) -> BlockNumber { + self.as_ref().expires_at() + } +} + +// TRANSACTION GRAPH +// ================================================================================================ + +/// Tracks all [`AuthenticatedTransaction`]s that are waiting to be included in a batch. +/// +/// Each transaction is a node in the underlying [`Graph`]. A directed edge from transaction `P` +/// to transaction `C` exists when `C` depends on state produced by `P` — for example, `C` +/// consumes an output note created by `P`, or `C` updates an account from the state that `P` +/// left it in. +/// +/// The graph is maintained as a DAG: transactions are only inserted once all their parent +/// dependencies are already present, and reverting a transaction also reverts all its +/// descendants. +#[derive(Clone, Debug, PartialEq, Default)] +pub struct TransactionGraph { + inner: Graph>, +} + +impl TransactionGraph { + pub fn append(&mut self, tx: Arc) -> Result<(), StateConflict> { + self.inner.append(tx) + } + + pub fn select_batch(&mut self, mut budget: BatchBudget) -> Option { + let mut selected = SelectedBatch::builder(); + + while let Some((id, tx)) = self.inner.selection_candidates().pop_first() { + if budget.check_then_subtract(tx) == BudgetStatus::Exceeded { + break; + } + + selected.push(Arc::clone(tx)); + self.inner.select_candidate(*id); + } + + if selected.is_empty() { + return None; + } + let selected = selected.build(); + + Some(selected) + } + + /// Reverts expired transactions and their descendants. + /// + /// Only unselected transactions are considered; selected transactions are assumed to be in + /// committed blocks and should not be reverted. + /// + /// This is because we don't distinguish between committed and selected transactions. If we + /// didn't ignore selected transactions here, we would revert committed ones as well, which + /// breaks the state. + /// + /// Returns the identifiers of transactions that were removed from the graph. + /// + /// # Note + /// + /// Since this _ignores_ selected transactions, and the purpose is to revert expired + /// transactions after a block is committed, the caller **must** ensure that selected + /// transactions from expired batches (and therefore not committed) are deselected + /// _before_ calling this function. i.e. first revert expired batches and deselect their + /// transactions, then call this. + pub fn revert_expired(&mut self, chain_tip: BlockNumber) -> HashSet { + self.inner + .revert_expired_unselected(chain_tip) + .into_iter() + .map(|tx| tx.id()) + .collect() + } + + /// Reverts the given transaction and _all_ its descendants _IFF_ it is present in the graph. + /// + /// This includes batches that have been marked as proven. + /// + /// Returns the reverted batches in the _reverse_ chronological order they were appended in. + pub fn revert_tx_and_descendants(&mut self, transaction: TransactionId) -> Vec { + // We need this check because `inner.revert..` panics if the node is unknown. + if !self.inner.contains(&transaction) { + return Vec::default(); + } + + self.inner + .revert_node_and_descendants(transaction) + .into_iter() + .map(|tx| tx.id()) + .collect() + } + + /// Marks the batch's transactions are ready for selection again. + /// + /// # Panics + /// + /// Panics if the given batch has any child batches which are still in flight. + pub fn requeue_transactions(&mut self, batch: SelectedBatch) { + for tx in batch.into_transactions().iter().rev() { + self.inner.deselect(tx.id()); + } + } + + /// Prunes the given transaction. + /// + /// # Panics + /// + /// Panics if the transaction does not exist, or has existing ancestors in the transaction + /// graph. + pub fn prune(&mut self, transaction: TransactionId) { + self.inner.prune(transaction); + } + + /// Number of transactions which have not been selected for inclusion in a batch. + pub fn unselected_count(&self) -> usize { + self.inner.node_count() - self.inner.selected_count() + } + + /// Total number of transactions in the graph. + pub fn count(&self) -> usize { + self.inner.node_count() + } + + pub fn accounts_count(&self) -> usize { + self.inner.account_count() + } + + pub fn nullifier_count(&self) -> usize { + self.inner.nullifier_count() + } + + pub fn output_note_count(&self) -> usize { + self.inner.output_note_count() + } +} diff --git a/crates/block-producer/src/mempool/inflight_state/tests.rs b/crates/block-producer/src/mempool/inflight_state/tests.rs deleted file mode 100644 index e4c72f6573..0000000000 --- a/crates/block-producer/src/mempool/inflight_state/tests.rs +++ /dev/null @@ -1,335 +0,0 @@ -use assert_matches::assert_matches; -use miden_node_utils::ErrorReport; -use miden_protocol::Word; - -use super::*; -use crate::test_utils::note::{mock_note, mock_output_note}; -use crate::test_utils::{MockProvenTxBuilder, mock_account_id}; - -#[test] -fn rejects_expired_transaction() { - let chain_tip = BlockNumber::from(123); - let mut uut = InflightState::new(chain_tip, 5, 0u32); - - let expired = MockProvenTxBuilder::with_account_index(0) - .expiration_block_num(chain_tip) - .build(); - let expired = - AuthenticatedTransaction::from_inner(expired).with_authentication_height(chain_tip); - - let err = uut.add_transaction(&expired).unwrap_err(); - assert_matches!(err, AddTransactionError::Expired { .. }); -} - -/// Ensures that the specified expiration slack is adhered to. -#[test] -fn expiration_slack_is_respected() { - let slack = 3; - let chain_tip = BlockNumber::from(123); - let expiration_limit = chain_tip + slack; - let mut uut = InflightState::new(chain_tip, 5, slack); - - let unexpired = MockProvenTxBuilder::with_account_index(0) - .expiration_block_num(expiration_limit + 1) - .build(); - let unexpired = - AuthenticatedTransaction::from_inner(unexpired).with_authentication_height(chain_tip); - - uut.add_transaction(&unexpired).unwrap(); - - let expired = MockProvenTxBuilder::with_account_index(1) - .expiration_block_num(expiration_limit) - .build(); - let expired = - AuthenticatedTransaction::from_inner(expired).with_authentication_height(chain_tip); - - let err = uut.add_transaction(&expired).unwrap_err(); - assert_matches!(err, AddTransactionError::Expired { .. }); -} - -#[test] -fn rejects_duplicate_nullifiers() { - let account = mock_account_id(1); - let states = (1u8..=4).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - let note_seed = 123; - // We need to make the note available first, in order for it to be consumed at all. - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]) - .output_notes(vec![mock_output_note(note_seed)]) - .build(); - let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]) - .unauthenticated_notes(vec![mock_note(note_seed)]) - .build(); - let tx2 = MockProvenTxBuilder::with_account(account, states[2], states[3]) - .unauthenticated_notes(vec![mock_note(note_seed)]) - .build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1)).unwrap(); - - let err = uut.add_transaction(&AuthenticatedTransaction::from_inner(tx2)).unwrap_err(); - - assert_matches!( - err, - AddTransactionError::VerificationFailed(VerifyTxError::InputNotesAlreadyConsumed( - notes - )) if notes == vec![mock_note(note_seed).nullifier()] - ); -} - -#[test] -fn rejects_duplicate_output_notes() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - let note = mock_output_note(123); - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]) - .output_notes(vec![note.clone()]) - .build(); - let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]) - .output_notes(vec![note.clone()]) - .build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); - - let err = uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1)).unwrap_err(); - - assert_matches!( - err, - AddTransactionError::VerificationFailed(VerifyTxError::OutputNotesAlreadyExist( - notes - )) if notes == vec![note.id()] - ); -} - -#[test] -fn rejects_account_state_mismatch() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - let tx = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - let err = uut - .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_store_state(states[2])) - .unwrap_err(); - - assert_matches!( - err, - AddTransactionError::VerificationFailed(VerifyTxError::IncorrectAccountInitialCommitment { - tx_initial_account_commitment: init_state, - current_account_commitment: current_state, - }) if init_state == states[0] && current_state == states[2].into() - ); -} - -#[test] -fn account_state_transitions() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); - let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]).build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1).with_empty_store_state()) - .unwrap(); -} - -#[test] -fn new_account_state_defaults_to_zero() { - let account = mock_account_id(1); - - let tx = - MockProvenTxBuilder::with_account(account, [0u8, 0, 0, 0].into(), [1u8, 0, 0, 0].into()) - .build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) - .unwrap(); -} - -#[test] -fn inflight_account_state_overrides_input_state() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); - let tx1 = MockProvenTxBuilder::with_account(account, states[1], states[2]).build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0)).unwrap(); - - // Feed in an old state via input. This should be ignored, and the previous tx's final - // state should be used. - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1).with_store_state(states[0])) - .unwrap(); -} - -#[test] -fn dependency_tracking() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - let note_seed = 123; - - // Parent via account state. - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); - // Parent via output note. - let tx1 = MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) - .output_notes(vec![mock_output_note(note_seed)]) - .build(); - - let tx = MockProvenTxBuilder::with_account(account, states[1], states[2]) - .unauthenticated_notes(vec![mock_note(note_seed)]) - .build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx0.clone())).unwrap(); - uut.add_transaction(&AuthenticatedTransaction::from_inner(tx1.clone())).unwrap(); - - let parents = uut - .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) - .unwrap(); - let expected = BTreeSet::from([tx0.id(), tx1.id()]); - - assert_eq!(parents, expected); -} - -#[test] -fn committed_parents_are_not_tracked() { - let account = mock_account_id(1); - let states = (1u8..=3).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - let note_seed = 123; - - // Parent via account state. - let tx0 = MockProvenTxBuilder::with_account(account, states[0], states[1]).build(); - let tx0 = AuthenticatedTransaction::from_inner(tx0); - // Parent via output note. - let tx1 = MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) - .output_notes(vec![mock_output_note(note_seed)]) - .build(); - let tx1 = AuthenticatedTransaction::from_inner(tx1); - - let tx = MockProvenTxBuilder::with_account(account, states[1], states[2]) - .unauthenticated_notes(vec![mock_note(note_seed)]) - .build(); - - let mut uut = InflightState::new(BlockNumber::default(), 1, 0u32); - uut.add_transaction(&tx0.clone()).unwrap(); - uut.add_transaction(&tx1.clone()).unwrap(); - - // Commit the parents, which should remove them from dependency tracking. - uut.commit_block([tx0.id(), tx1.id()]); - - let parents = uut - .add_transaction(&AuthenticatedTransaction::from_inner(tx).with_empty_store_state()) - .unwrap(); - - assert!(parents.is_empty()); -} - -#[test] -fn tx_insertions_and_reversions_cancel_out() { - // Reverting txs should be equivalent to them never being inserted. - // - // We test this by reverting some txs and equating it to the remaining set. - // This is a form of property test. - let states = (1u8..=5).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - let txs = vec![ - MockProvenTxBuilder::with_account(mock_account_id(1), states[0], states[1]), - MockProvenTxBuilder::with_account(mock_account_id(1), states[1], states[2]) - .output_notes(vec![mock_output_note(111), mock_output_note(222)]), - MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) - .unauthenticated_notes(vec![mock_note(222)]), - MockProvenTxBuilder::with_account(mock_account_id(1), states[2], states[3]), - MockProvenTxBuilder::with_account(mock_account_id(2), states[1], states[2]) - .unauthenticated_notes(vec![mock_note(111)]) - .output_notes(vec![mock_output_note(45)]), - ]; - - let txs = txs - .into_iter() - .map(MockProvenTxBuilder::build) - .map(AuthenticatedTransaction::from_inner) - .collect::>(); - - for i in 0..states.len() { - // Insert all txs and then revert the last `i` of them. - // This should match only inserting the first `N-i` of them. - let mut reverted = InflightState::new(BlockNumber::default(), 1, 0u32); - for (idx, tx) in txs.iter().enumerate() { - reverted.add_transaction(tx).unwrap_or_else(|err| { - panic!("Inserting tx #{idx} in iteration {i} should succeed: {}", err.as_report()) - }); - } - reverted.revert_transactions( - txs.iter().rev().take(i).rev().map(AuthenticatedTransaction::id).collect(), - ); - - let mut inserted = InflightState::new(BlockNumber::default(), 1, 0u32); - for (idx, tx) in txs.iter().rev().skip(i).rev().enumerate() { - inserted.add_transaction(tx).unwrap_or_else(|err| { - panic!("Inserting tx #{idx} in iteration {i} should succeed: {}", err.as_report()) - }); - } - - assert_eq!(reverted, inserted, "Iteration {i}"); - } -} - -#[test] -fn pruning_committed_state() { - //! This is a form of property test, where we assert that pruning the first `i` of `N` - //! transactions is equivalent to only inserting the last `N-i` transactions. - let states = (1u8..=5).map(|x| Word::from([x, 0, 0, 0])).collect::>(); - - // Skipping initial txs means that output notes required for subsequent unauthenticated - // input notes wont' always be present. To work around this, we instead only use - // authenticated input notes. - let txs = vec![ - MockProvenTxBuilder::with_account(mock_account_id(1), states[0], states[1]), - MockProvenTxBuilder::with_account(mock_account_id(1), states[1], states[2]) - .output_notes(vec![mock_output_note(111), mock_output_note(222)]), - MockProvenTxBuilder::with_account(mock_account_id(2), states[0], states[1]) - .nullifiers(vec![mock_note(222).nullifier()]), - MockProvenTxBuilder::with_account(mock_account_id(1), states[2], states[3]), - MockProvenTxBuilder::with_account(mock_account_id(2), states[1], states[2]) - .nullifiers(vec![mock_note(111).nullifier()]) - .output_notes(vec![mock_output_note(45)]), - ]; - - let txs = txs - .into_iter() - .map(MockProvenTxBuilder::build) - .map(AuthenticatedTransaction::from_inner) - .collect::>(); - - for i in 0..states.len() { - // Insert all txs and then commit and prune the first `i` of them. - // - // This should match only inserting the final `N-i` transactions. - // Note: we force all committed state to immediately be pruned by setting - // it to zero. - let mut committed = InflightState::new(BlockNumber::default(), 0, 0u32); - for (idx, tx) in txs.iter().enumerate() { - committed.add_transaction(tx).unwrap_or_else(|err| { - panic!("Inserting tx #{idx} in iteration {i} should succeed: {}", err.as_report()) - }); - } - committed.commit_block(txs.iter().take(i).map(AuthenticatedTransaction::id)); - - let mut inserted = InflightState::new(BlockNumber::from(1), 0, 0u32); - for (idx, tx) in txs.iter().skip(i).enumerate() { - // We need to adjust the height since we are effectively at block "1" now. - let tx = tx.clone().with_authentication_height(1.into()); - inserted.add_transaction(&tx).unwrap_or_else(|err| { - panic!("Inserting tx #{idx} in iteration {i} should succeed: {}", err.as_report()) - }); - } - - assert_eq!(committed, inserted, "Iteration {i}"); - } -} diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 8d59f3ad4b..28fae5abad 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -25,7 +25,7 @@ //! committed within the same block. //! //! While this is technically possible, the bookkeeping and implementation to allow this are -//! infeasible, and both blocks and batches have constraints. This is also undersireable since if +//! infeasible, and both blocks and batches have constraints. This is also undesirable since if //! one component of such a cycle fails or expires, then all others would likewise need to be //! reverted. //! @@ -39,25 +39,34 @@ //! included in a batch (or are part of the currently proposed batch). //! - Similarly, batches are proposed for block inclusion once _all_ ancestors have been included in //! a block (or are part of the currently proposed block). -//! - Reverting a node reverts all descendents as well. - -use std::collections::{HashMap, HashSet}; +//! - Reverting a node reverts all descendants as well. +//! +//! The mempool maintains two DAGs: one for authenticated transactions awaiting batching and one for +//! batches awaiting inclusion in a block. As batches are selected, their constituent transactions +//! are marked in the transaction graph while the batch itself is appended to the batch graph. When +//! a block is proposed, the selected batches are staged in `pending_block` until the block is +//! either committed or rolled back. +//! +//! Recently committed batches are retained in `committed_blocks` according to the configured +//! `state_retention`, giving the mempool enough local history to validate newly authenticated +//! transactions even if the store and block producer momentarily disagree on the chain tip. +use std::collections::{HashSet, VecDeque}; use std::num::NonZeroUsize; use std::sync::Arc; use miden_node_proto::domain::mempool::MempoolEvent; +use miden_node_utils::ErrorReport; use miden_protocol::batch::{BatchId, ProvenBatch}; use miden_protocol::block::{BlockHeader, BlockNumber}; -use miden_protocol::transaction::TransactionId; +use miden_protocol::transaction::{TransactionHeader, TransactionId}; use subscription::SubscriptionProvider; use tokio::sync::{Mutex, MutexGuard, mpsc}; -use tracing::{instrument, warn}; +use tracing::instrument; +use crate::block_builder::SelectedBlock; use crate::domain::batch::SelectedBatch; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{AddTransactionError, VerifyTxError}; -use crate::mempool::budget::BudgetStatus; -use crate::mempool::nodes::{BlockNode, Node, NodeId, ProposedBatchNode, TransactionNode}; +use crate::errors::{AddTransactionError, StateConflict}; use crate::{ COMPONENT, DEFAULT_MEMPOOL_TX_CAPACITY, @@ -68,8 +77,7 @@ use crate::{ mod budget; pub use budget::{BatchBudget, BlockBudget}; -mod nodes; -mod state; +mod graph; mod subscription; #[cfg(test)] @@ -81,7 +89,7 @@ mod tests; #[derive(Clone)] pub struct SharedMempool(Arc>); -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct MempoolConfig { /// The constraints each proposed block must adhere to. pub block_budget: BlockBudget, @@ -107,6 +115,11 @@ pub struct MempoolConfig { /// mempool handling the authenticated data. Retaining the recent blocks locally therefore /// guarantees that the mempool can verify the data against the additional changes so long as /// the data was authenticated against one of the retained blocks. + /// + /// Practically, retaining `state_retention` blocks lets the mempool authenticate any + /// submission whose claimed height lies within `[chain_tip - state_retention + 1, + /// chain_tip]`. Inputs authenticated before this window are rejected as stale to prevent + /// gaps between the store and the locally retained history. pub state_retention: NonZeroUsize, /// The maximum number of uncommitted transactions allowed in the mempool at once. @@ -133,6 +146,10 @@ impl Default for MempoolConfig { // ================================================================================================ impl SharedMempool { + /// Acquires an asynchronous lock on the underlying [`Mempool`]. + /// + /// Callers should minimise the amount of work performed while holding the lock to reduce + /// contention with other subsystems that need to access the pool. #[instrument(target = COMPONENT, name = "mempool.lock", skip_all)] pub async fn lock(&self) -> MutexGuard<'_, Mempool> { self.0.lock().await @@ -142,14 +159,19 @@ impl SharedMempool { // MEMPOOL // ================================================================================================ -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Mempool { - /// Contains the aggregated state of all transactions, batches and blocks currently inflight in - /// the mempool. Combines with `nodes` to describe the mempool's state graph. - state: state::InflightState, - - /// Contains all the transactions, batches and blocks currently in the mempool. - nodes: nodes::Nodes, + /// Tracks the dependency graph for transactions awaiting batching. + transactions: graph::TransactionGraph, + /// Tracks the dependency graph for batches awaiting inclusion in a block. + batches: graph::BatchGraph, + /// The block currently being built, if any. + pending_block: Option, + /// The most recently committed blocks in chronological order. + /// + /// Limited to the state retention amount defined in the config. Once a pending block is + /// committed it is appended here, and the oldest block's state is pruned. + committed_blocks: VecDeque, chain_tip: BlockNumber, @@ -157,13 +179,6 @@ pub struct Mempool { subscription: subscription::SubscriptionProvider, } -// We have to implement this manually since the subscription channel does not implement PartialEq. -impl PartialEq for Mempool { - fn eq(&self, other: &Self) -> bool { - self.state == other.state && self.nodes == other.nodes - } -} - impl Mempool { // CONSTRUCTORS // -------------------------------------------------------------------------------------------- @@ -178,8 +193,10 @@ impl Mempool { config, chain_tip, subscription: SubscriptionProvider::new(chain_tip), - state: state::InflightState::default(), - nodes: nodes::Nodes::default(), + transactions: graph::TransactionGraph::default(), + batches: graph::BatchGraph::default(), + pending_block: None, + committed_blocks: VecDeque::default(), } } @@ -203,56 +220,27 @@ impl Mempool { /// /// # Errors /// - /// Returns an error if the transaction's initial conditions don't match the current state. + /// Returns an error if the transaction would exceed the mempool capacity or if its initial + /// conditions don't match the current state. + #[expect( + clippy::needless_pass_by_value, + reason = "Not impactful, and we may want ownership in the future" + )] #[instrument(target = COMPONENT, name = "mempool.add_transaction", skip_all, fields(tx=%tx.id()))] pub fn add_transaction( &mut self, tx: Arc, ) -> Result { - if self.nodes.uncommitted_tx_count() >= self.config.tx_capacity.get() { + if self.unbatched_transactions_count() >= self.config.tx_capacity.get() { return Err(AddTransactionError::CapacityExceeded); } self.authentication_staleness_check(tx.authentication_height())?; self.expiration_check(tx.expires_at())?; - - // The transaction should append to the existing mempool state. This means: - // - // - The current account commitment should match the tx's initial state. - // - No duplicate nullifiers are created. - // - No duplicate notes are created. - // - All unauthenticated notes exist as output notes. - let account_commitment = self - .state - .account_commitment(&tx.account_id()) - .or(tx.store_account_state()) - .unwrap_or_default(); - if tx.account_update().initial_state_commitment() != account_commitment { - return Err(VerifyTxError::IncorrectAccountInitialCommitment { - tx_initial_account_commitment: tx.account_update().initial_state_commitment(), - current_account_commitment: account_commitment, - } - .into()); - } - let double_spend = self.state.nullifiers_exist(tx.nullifiers()); - if !double_spend.is_empty() { - return Err(VerifyTxError::InputNotesAlreadyConsumed(double_spend).into()); - } - let duplicates = self.state.output_notes_exist(tx.output_note_commitments()); - if !duplicates.is_empty() { - return Err(VerifyTxError::OutputNotesAlreadyExist(duplicates).into()); - } - let missing = self.state.output_notes_missing(tx.unauthenticated_note_commitments()); - if !missing.is_empty() { - return Err(VerifyTxError::UnauthenticatedNotesNotFound(missing).into()); - } - - // Insert the transaction node. + self.transactions + .append(Arc::clone(&tx)) + .map_err(AddTransactionError::StateConflict)?; self.subscription.transaction_added(&tx); - let tx = TransactionNode::new(tx); - self.state.insert(NodeId::Transaction(tx.id()), &tx); - self.nodes.txs.insert(tx.id(), tx); - self.inject_telemetry(); Ok(self.chain_tip) @@ -265,79 +253,12 @@ impl Mempool { /// Returns `None` if no transactions are available. #[instrument(target = COMPONENT, name = "mempool.select_batch", skip_all)] pub fn select_batch(&mut self) -> Option { - // The selection algorithm is fairly neanderthal in nature. - // - // We iterate over all transaction nodes, each time selecting the first transaction which - // has no parent nodes that are unselected transactions. This is fairly primitive, but - // avoids the manual bookkeeping of which transactions are selectable. - // - // Note that selecting a transaction can unblock other transactions. This implementation - // handles this by resetting the iteration whenever a transaction is selected. - // - // This is still reasonably performant given that we only retain unselected transactions as - // transaction nodes i.e. selected transactions become batch nodes. - // - // The additional bookkeeping can be implemented once we have fee related strategies. KISS. - - let mut selected = SelectedBatch::builder(); - let mut budget = self.config.batch_budget; - - let mut candidates = self.nodes.txs.values(); - - 'next: while let Some(candidate) = candidates.next() { - if selected.contains(&candidate.id()) { - continue 'next; - } - - // A transaction may be placed in a batch IFF all parents were already in a batch (or - // are part of this one). - for parent in self.state.parents(NodeId::Transaction(candidate.id()), candidate) { - match parent { - // TODO(mirko): Once user batches are supported, they will also need to be - // checked here. - NodeId::Transaction(parent) if !selected.contains(&parent) => { - continue 'next; - }, - NodeId::Transaction(_) - | NodeId::ProposedBatch(_) - | NodeId::ProvenBatch(_) - | NodeId::Block(_) => {}, - } - } - - if budget.check_then_subtract(candidate.inner()) == BudgetStatus::Exceeded { - break; - } - - candidates = self.nodes.txs.values(); - selected.push(candidate.inner().clone()); - } - - if selected.is_empty() { - return None; + let batch = self.transactions.select_batch(self.config.batch_budget)?; + if let Err(err) = self.batches.append(batch.clone()) { + panic!("failed to append batch to dependency graph: {}", err.as_report()); } - let selected = selected.build(); - - let batch = ProposedBatchNode::new(selected.clone()); - let batch_id = batch.batch_id(); - - for tx in batch.transactions() { - let node = - self.nodes.txs.remove(&tx.id()).expect("selected transaction node must exist"); - self.state.remove(&node); - tracing::info!( - batch.id = %batch_id, - transaction.id = %tx.id(), - "Transaction selected for inclusion in batch" - ); - } - self.state.insert(NodeId::ProposedBatch(batch_id), &batch); - self.nodes.proposed_batches.insert(batch_id, batch); - - // TODO(mirko): Selecting a batch can unblock user batches, which should be checked here. - self.inject_telemetry(); - Some(selected) + Some(batch) } /// Drops the proposed batch and all of its descendants. @@ -345,52 +266,27 @@ impl Mempool { /// Transactions are re-queued. #[instrument(target = COMPONENT, name = "mempool.rollback_batch", skip_all)] pub fn rollback_batch(&mut self, batch: BatchId) { - // Due to the distributed nature of the system, its possible that a proposed batch was - // already proven, or already reverted. This guards against this eventuality. - if !self.nodes.proposed_batches.contains_key(&batch) { + // Guards against bugs in the proof scheduler where a retry results in multiple results + // coming back for the same batch. If the batch previously succeeded, then yanking it would + // corrupt the mempool since the batch might be in a block. + // + // Either way, we simply ignore rollbacks of batches that have already succeeded as a + // precaution. + if self.batches.is_proven(&batch) { return; } - // TODO(mirko): This will be somewhat complicated by the introduction of user batches - // since these don't have all inner txs present and therefore must at least partially - // revert their own tx descendents. - - // Remove all descendents and reinsert their transactions. This is safe to do since - // a batch's state impact is the aggregation of its transactions. - let reverted = self.revert_subtree(NodeId::ProposedBatch(batch)); - - for (_, node) in reverted { - for tx in node.transactions() { - let tx = TransactionNode::new(Arc::clone(tx)); - let tx_id = tx.id(); - self.state.insert(NodeId::Transaction(tx_id), &tx); - self.nodes.txs.insert(tx_id, tx); - tracing::info!( - batch.id = %batch, - transaction.id = %tx_id, - "Transaction requeued as part of batch rollback" - ); - } + let reverted_batches = self.batches.revert_batch_and_descendants(batch); + for reverted in reverted_batches { + self.transactions.requeue_transactions(reverted); } - self.inject_telemetry(); } /// Marks a batch as proven if it exists. #[instrument(target = COMPONENT, name = "mempool.commit_batch", skip_all)] pub fn commit_batch(&mut self, proof: Arc) { - // Due to the distributed nature of the system, its possible that a proposed batch was - // already proven, or already reverted. This guards against this eventuality. - let Some(proposed) = self.nodes.proposed_batches.remove(&proof.id()) else { - return; - }; - - self.state.remove(&proposed); - - let proven = proposed.into_proven_batch_node(proof); - self.state.insert(NodeId::ProvenBatch(proven.id()), &proven); - self.nodes.proven_batches.insert(proven.id(), proven); - + self.batches.submit_proof(proof); self.inject_telemetry(); } @@ -404,70 +300,20 @@ impl Mempool { /// /// Panics if there is already a block in flight. #[instrument(target = COMPONENT, name = "mempool.select_block", skip_all)] - pub fn select_block(&mut self) -> (BlockNumber, Vec>) { - // The selection algorithm is fairly neanderthal in nature. - // - // We iterate over all proven batch nodes, each time selecting the first which has no - // parent nodes that are unselected batches. - // - // Note that selecting a batch can unblock other batches. This implementation handles this - // by resetting the iteration whenever a batch is selected. - + pub fn select_block(&mut self) -> SelectedBlock { assert!( - self.nodes.proposed_block.is_none(), + self.pending_block.is_none(), "block {} is already in progress", - self.nodes.proposed_block.as_ref().unwrap().0 + self.pending_block.as_ref().unwrap().block_number ); let block_number = self.chain_tip.child(); - let mut selected = BlockNode::new(block_number); - let mut budget = self.config.block_budget; - let mut candidates = self.nodes.proven_batches.values(); - - 'next: while let Some(candidate) = candidates.next() { - if selected.contains(candidate.id()) { - continue 'next; - } - - // A batch is selectable if all parents are already blocks, or if the batch is part of - // the current block selection. - for parent in self.state.parents(NodeId::ProvenBatch(candidate.id()), candidate) { - match parent { - NodeId::Block(_) => {}, - NodeId::ProvenBatch(parent) if selected.contains(parent) => {}, - _ => continue 'next, - } - } - - if budget.check_then_subtract(candidate.inner()) == BudgetStatus::Exceeded { - break; - } - - // Reset iteration as this batch could have unblocked previous batches. - candidates = self.nodes.proven_batches.values(); - selected.push(candidate.clone()); - } - - // Replace the batches with the block in state and nodes. - for batch in selected.batches() { - // SAFETY: Selected batches came from nodes, and are unique. - let batch = self.nodes.proven_batches.remove(&batch.id()).unwrap(); - self.state.remove(&batch); - tracing::info!( - block.number = %block_number, - batch.id = %batch.id(), - "Batch selected for inclusion in block", - ); - } - - let block_number = self.chain_tip.child(); - let batches = selected.batches().to_vec(); - - self.state.insert(NodeId::Block(block_number), &selected); - self.nodes.proposed_block = Some((block_number, selected)); + let batches = self.batches.select_block(self.config.block_budget); + let block = SelectedBlock { block_number, batches }; + self.pending_block = Some(block.clone()); self.inject_telemetry(); - (block_number, batches) + block } /// Notify the pool that the in flight block was successfully committed to the chain. @@ -478,32 +324,34 @@ impl Mempool { /// Sends a [`MempoolEvent::BlockCommitted`] event to subscribers, as well as a /// [`MempoolEvent::TransactionsReverted`] for transactions that are now considered expired. /// - /// # Returns - /// - /// Returns a set of transactions that were purged from the mempool because they can no longer - /// be included in the chain (e.g., expired transactions and their descendants). + /// On success the internal state is updated in place: the chain tip advances, expired data is + /// pruned, and subscribers are notified about the committed block and any reverted + /// transactions. /// /// # Panics /// /// Panics if there is no block in flight. #[instrument(target = COMPONENT, name = "mempool.commit_block", skip_all)] - pub fn commit_block(&mut self, to_commit: BlockHeader) { + pub fn commit_block(&mut self, block_header: BlockHeader) { + assert_eq!(self.chain_tip.child(), block_header.block_num()); let block = self - .nodes - .proposed_block - .take_if(|(proposed, _)| proposed == &to_commit.block_num()) + .pending_block + .take_if(|pending| pending.block_number == block_header.block_num()) .expect("block must be in progress to commit"); - let tx_ids = block.1.transactions().map(|tx| tx.id()).collect(); + let tx_ids = block + .batches + .iter() + .flat_map(|batch| batch.transactions().as_slice().iter()) + .map(miden_protocol::transaction::TransactionHeader::id) + .collect(); - self.nodes.committed_blocks.push_back(block); self.chain_tip = self.chain_tip.child(); - self.subscription.block_committed(to_commit, tx_ids); + self.subscription.block_committed(block_header, tx_ids); - if self.nodes.committed_blocks.len() > self.config.state_retention.get() { - let (_number, node) = self.nodes.committed_blocks.pop_front().unwrap(); - self.state.remove(&node); - } - let reverted_tx_ids = self.revert_expired_nodes(); + self.committed_blocks.push_back(block); + self.prune_oldest_block(); + + let reverted_tx_ids = self.revert_expired(); self.subscription.txs_reverted(reverted_tx_ids); self.inject_telemetry(); } @@ -514,10 +362,8 @@ impl Mempool { /// /// Sends a [`MempoolEvent::TransactionsReverted`] event to subscribers. /// - /// # Returns - /// - /// Returns a set of transaction IDs that were reverted because they can no longer be - /// included in in the chain (e.g., expired transactions and their descendants) + /// The in-flight block state and all related transactions are discarded, and subscribers are + /// notified about the reverted transactions. /// /// # Panics /// @@ -526,23 +372,14 @@ impl Mempool { pub fn rollback_block(&mut self, block: BlockNumber) { // Only revert if the given block is actually inflight. // - // This guards against extreme circumstances where multiple block proofs may be inflight at - // once. Due to the distributed nature of the node, one can imagine a scenario where - // multiple provers get the same job for example. - // // FIXME: We should consider a more robust check here to identify the block by a hash. // If multiple jobs are possible, then so are multiple variants with the same block // number. - if self - .nodes - .proposed_block - .as_ref() - .is_none_or(|(proposed, _)| proposed != &block) - { + if self.pending_block.as_ref().is_none_or(|pending| pending.block_number != block) { return; } - // Remove all descendents _without_ reinserting the transactions. + // Remove all descendants _without_ reinserting the transactions. // // This is done to prevent a system bug from causing repeated failures if we keep retrying // the same transactions. Since we can't trivially identify the cause of the block @@ -550,33 +387,19 @@ impl Mempool { // // A more refined approach could be to tag the offending transactions and then evict them // once a certain failure threshold has been met. - let reverted = self.revert_subtree(NodeId::Block(block)); let mut reverted_txs = HashSet::default(); + let block = self.pending_block.take().expect("we just checked it is some"); + for batch in block.batches { + let reverted = self.batches.revert_batch_and_descendants(batch.id()); - // Log reverted batches and transactions. - for (id, node) in reverted { - match id { - NodeId::ProposedBatch(batch_id) | NodeId::ProvenBatch(batch_id) => { - tracing::info!( - block.number=%block, - batch.id=%batch_id, - "Reverted batch as part of block rollback" - ); - }, - NodeId::Transaction(_) | NodeId::Block(_) => {}, - } - - for tx in node.transactions() { - reverted_txs.insert(tx.id()); - tracing::info!( - block.number=%block, - transaction.id=%tx.id(), - "Reverted transaction as part of block rollback" - ); + for batch in reverted { + for tx in batch.into_transactions() { + reverted_txs.extend(self.transactions.revert_tx_and_descendants(tx.id())); + } } } - self.subscription.txs_reverted(reverted_txs); + self.subscription.txs_reverted(reverted_txs); self.inject_telemetry(); } @@ -596,17 +419,17 @@ impl Mempool { /// Returns the number of transactions currently waiting to be batched. pub fn unbatched_transactions_count(&self) -> usize { - self.nodes.txs.len() + self.transactions.unselected_count() } /// Returns the number of batches currently being proven. pub fn proposed_batches_count(&self) -> usize { - self.nodes.proposed_batches.len() + self.batches.proposed_count() } /// Returns the number of proven batches waiting for block inclusion. pub fn proven_batches_count(&self) -> usize { - self.nodes.proven_batches.len() + self.batches.proven_count() } // INTERNAL HELPERS @@ -617,121 +440,79 @@ impl Mempool { /// Note that these are only visible in the OpenTelemetry context, as conventional tracing /// does not track fields added dynamically. fn inject_telemetry(&self) { + use miden_node_utils::tracing::OpenTelemetrySpanExt; let span = tracing::Span::current(); - self.nodes.inject_telemetry(&span); - self.state.inject_telemetry(&span); + let committed_txs = self + .committed_blocks + .iter() + .flat_map(|block| block.batches.iter()) + .map(|batch| batch.transactions().as_slice().len()) + .sum::(); + span.set_attribute( + "mempool.transactions.uncommitted", + self.transactions.count() - committed_txs, + ); + span.set_attribute("mempool.transactions.unbatched", self.unbatched_transactions_count()); + span.set_attribute("mempool.batches.proposed", self.proposed_batches_count()); + span.set_attribute("mempool.batches.proven", self.proven_batches_count()); + + span.set_attribute("mempool.accounts", self.transactions.accounts_count()); + span.set_attribute("mempool.nullifiers", self.transactions.nullifier_count()); + span.set_attribute("mempool.output_notes", self.transactions.output_note_count()); } - /// Reverts expired transactions and batches as per the current `chain_tip`. + /// Prunes the oldest locally retained block if the number of blocks exceeds the configured + /// limit. /// - /// Returns the list of all transactions that were reverted. - fn revert_expired_nodes(&mut self) -> HashSet { - let expired_txs = self.nodes.txs.iter().filter_map(|(id, node)| { - (node.expires_at() <= self.chain_tip).then_some(NodeId::Transaction(*id)) - }); - let expired_proposed_batches = - self.nodes.proposed_batches.iter().filter_map(|(id, node)| { - (node.expires_at() <= self.chain_tip).then_some(NodeId::ProposedBatch(*id)) - }); - let expired_proven_batches = self.nodes.proven_batches.iter().filter_map(|(id, node)| { - (node.expires_at() <= self.chain_tip).then_some(NodeId::ProvenBatch(*id)) - }); - - let expired = expired_proven_batches - .chain(expired_proposed_batches) - .chain(expired_txs) - .collect::>(); - let mut reverted_txs = HashSet::default(); - for expired_id in expired { - let reverted = self.revert_subtree(expired_id); - for (id, node) in reverted { - match id { - NodeId::ProposedBatch(batch_id) | NodeId::ProvenBatch(batch_id) => { - tracing::info!( - ancestor=?expired_id, - batch.id=%batch_id, - "Reverted batch due to expiration of ancestor" - ); - }, - NodeId::Transaction(_) => {}, - NodeId::Block(block_number) => panic!( - "Found block {block_number} descendent while reverting expired nodes which shouldn't be possible since only one block is in progress" - ), - } - - for tx in node.transactions() { - reverted_txs.insert(tx.id()); - tracing::info!( - ancestor=?expired_id, - transaction.id=%tx.id(), - "Reverted transaction due to expiration of ancestor" - ); - } - } + /// This includes pruning the block's batches and transactions from their graphs. + fn prune_oldest_block(&mut self) { + if self.committed_blocks.len() <= self.config.state_retention.get() { + return; } + let block = self.committed_blocks.pop_front().unwrap(); - reverted_txs - } - - /// Reverts the subtree with the given root and returns the reverted nodes. Does nothing if the - /// root node does not exist to allow using this in cases where multiple overlapping calls to - /// this are made. - fn revert_subtree(&mut self, root: NodeId) -> HashMap> { - let root_exists = match root { - NodeId::Transaction(id) => self.nodes.txs.contains_key(&id), - NodeId::ProposedBatch(id) => self.nodes.proposed_batches.contains_key(&id), - NodeId::ProvenBatch(id) => self.nodes.proven_batches.contains_key(&id), - NodeId::Block(id) => { - self.nodes.proposed_block.as_ref().is_some_and(|(number, _)| *number == id) - }, - }; - if !root_exists { - return HashMap::default(); + // We perform pruning in chronological order, from oldest to youngest. + // + // Pruning a node requires that the node has no parents, and using chronological + // order gives us this property. This works because a batch can only be included in + // a block once _all_ its parents have been included. So if we follow the same order, + // it means that a batch's parents would already have been pruned. + // + // The same logic follows for transactions. + for batch in block.batches.iter().map(|batch| batch.id()) { + self.batches.prune(batch); } - let mut to_process = vec![root]; - let mut reverted = HashMap::default(); - - while let Some(id) = to_process.pop() { - if reverted.contains_key(&id) { - continue; - } - - // SAFETY: all IDs come from the state DAG and must therefore exist. The processed check - // above also prevents removing a node twice. - let node: Box = match id { - NodeId::Transaction(id) => { - self.nodes.txs.remove(&id).map(|x| Box::new(x) as Box) - }, - NodeId::ProposedBatch(id) => { - self.nodes.proposed_batches.remove(&id).map(|x| Box::new(x) as Box) - }, - NodeId::ProvenBatch(id) => { - self.nodes.proven_batches.remove(&id).map(|x| Box::new(x) as Box) - }, - NodeId::Block(id) => self - .nodes - .proposed_block - .take_if(|(number, _)| number == &id) - .map(|(_, x)| Box::new(x) as Box), - } - .unwrap(); - - to_process.extend(self.state.children(id, node.as_ref())); - self.state.remove(node.as_ref()); - reverted.insert(id, node); + for tx in block + .batches + .iter() + .flat_map(|batch| batch.transactions().as_slice()) + .map(TransactionHeader::id) + { + self.transactions.prune(tx); } + } - reverted + /// Reverts all batches and transactions that have expired. + /// + /// Expired batch descendants are also reverted since these are now invalid. + /// + /// Transactions from batches are requeued. Expired transactions and their descendants are then + /// reverted as well. + fn revert_expired(&mut self) -> HashSet { + let batches = self.batches.revert_expired(self.chain_tip); + for batch in batches { + self.transactions.requeue_transactions(batch); + } + self.transactions.revert_expired(self.chain_tip) } - /// Rejects authentication height's which we cannot guarantee are correct from the locally + /// Rejects authentication heights that fall outside the overlap guaranteed by the locally /// retained state. /// - /// In other words, this returns an error if the authentication height is more than one block - /// older than the locally retained state. One block is allowed because this means block `N-1` - /// was authenticated by the store, and we can check blocks `N..chain_tip`. + /// The acceptable window is `[chain_tip - state_retention + 1, chain_tip]`; values below this + /// range are rejected as stale because the mempool no longer tracks the intermediate history. /// /// # Panics /// @@ -742,8 +523,10 @@ impl Mempool { &self, authentication_height: BlockNumber, ) -> Result<(), AddTransactionError> { - let oldest = self.nodes.oldest_committed_block().unwrap_or_default(); - let limit = oldest.parent().unwrap_or_default(); + let limit = self + .chain_tip + .checked_sub(self.committed_blocks.len() as u32) + .expect("number of committed blocks cannot exceed the chain tip"); if authentication_height < limit { return Err(AddTransactionError::StaleInputs { @@ -752,11 +535,10 @@ impl Mempool { }); } - let latest_block = - self.nodes.proposed_block.as_ref().map_or(self.chain_tip, |(number, _)| *number); assert!( - authentication_height <= latest_block, - "Authentication height {authentication_height} exceeded the latest known block {latest_block}" + authentication_height <= self.chain_tip, + "Authentication height {authentication_height} exceeded the chain tip {}", + self.chain_tip ); Ok(()) diff --git a/crates/block-producer/src/mempool/nodes.rs b/crates/block-producer/src/mempool/nodes.rs deleted file mode 100644 index 6ae39d37d0..0000000000 --- a/crates/block-producer/src/mempool/nodes.rs +++ /dev/null @@ -1,430 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; - -use miden_protocol::Word; -use miden_protocol::account::AccountId; -use miden_protocol::batch::{BatchId, ProvenBatch}; -use miden_protocol::block::BlockNumber; -use miden_protocol::note::{NoteHeader, Nullifier}; -use miden_protocol::transaction::{InputNoteCommitment, TransactionHeader, TransactionId}; - -use crate::domain::batch::SelectedBatch; -use crate::domain::transaction::AuthenticatedTransaction; - -/// Uniquely identifies a node in the mempool. -/// -/// This effectively describes the lifecycle of a transaction in the mempool. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub(super) enum NodeId { - Transaction(TransactionId), - // UserBatch(BatchId), - ProposedBatch(BatchId), - ProvenBatch(BatchId), - Block(BlockNumber), -} - -/// A node representing a [`AuthenticatedTransaction`] which is waiting for inclusion in a batch. -/// -/// Once this is selected for inclusion in a batch it will be moved inside a [`ProposedBatchNode`]. -#[derive(Clone, Debug, PartialEq)] -pub(super) struct TransactionNode(Arc); - -impl TransactionNode { - pub(super) fn new(inner: Arc) -> Self { - Self(inner) - } - - pub(super) fn id(&self) -> TransactionId { - self.0.id() - } - - pub(super) fn inner(&self) -> &Arc { - &self.0 - } - - pub(super) fn expires_at(&self) -> BlockNumber { - self.0.expires_at() - } -} - -/// Represents a batch which has been proposed by the mempool and which is undergoing proving. -/// -/// Once proven it transitions to a [`ProvenBatchNode`]. -#[derive(Clone, Debug, PartialEq)] -pub(super) struct ProposedBatchNode(SelectedBatch); - -impl ProposedBatchNode { - pub(super) fn new(batch: SelectedBatch) -> Self { - Self(batch) - } - - pub(super) fn into_proven_batch_node(self, proof: Arc) -> ProvenBatchNode { - ProvenBatchNode { - txs: self.0.into_transactions(), - inner: proof, - } - } - - pub(super) fn expires_at(&self) -> BlockNumber { - self.0.txs().iter().map(|tx| tx.expires_at()).min().unwrap_or_default() - } - - pub(super) fn batch_id(&self) -> BatchId { - self.0.id() - } -} - -/// Represents a [`ProvenBatch`] which is waiting for inclusion in a block. -#[derive(Clone, Debug, PartialEq)] -pub(super) struct ProvenBatchNode { - /// We need to store this in addition to the proven batch because [`ProvenBatch`] erases the - /// transaction information. We need the original information if we want to rollback the batch - /// but retain the transactions. - txs: Vec>, - inner: Arc, -} - -impl ProvenBatchNode { - pub(super) fn tx_headers(&self) -> impl Iterator { - self.inner.transactions().as_slice().iter() - } - - pub(super) fn id(&self) -> BatchId { - self.inner.id() - } - - pub(super) fn inner(&self) -> &Arc { - &self.inner - } - - pub(super) fn expires_at(&self) -> BlockNumber { - self.inner.batch_expiration_block_num() - } -} - -/// Represents a block - both committed and in-progress. -#[derive(Clone, Debug, PartialEq)] -pub(super) struct BlockNode { - txs: Vec>, - batches: Vec>, - number: BlockNumber, - /// Aggregated account updates of all batches. - account_updates: HashMap, -} - -impl BlockNode { - pub(super) fn new(number: BlockNumber) -> Self { - Self { - number, - txs: Vec::default(), - batches: Vec::default(), - account_updates: HashMap::default(), - } - } - - pub(super) fn push(&mut self, batch: ProvenBatchNode) { - let ProvenBatchNode { txs, inner: batch } = batch; - for (account, update) in batch.account_updates() { - self.account_updates - .entry(*account) - .and_modify(|(_, to)| { - assert!( - to == &update.initial_state_commitment(), - "Cannot select batch {} as its initial commitment {} for account {} does \ - not match the current commitment {}", - batch.id(), - update.initial_state_commitment(), - update.account_id(), - to - ); - - *to = update.final_state_commitment(); - }) - .or_insert((update.initial_state_commitment(), update.final_state_commitment())); - } - - self.txs.extend(txs); - self.batches.push(batch); - } - - pub(super) fn contains(&self, id: BatchId) -> bool { - self.batches.iter().any(|batch| batch.id() == id) - } - - pub(super) fn batches(&self) -> &[Arc] { - &self.batches - } -} - -/// Describes a node's impact on the state. -/// -/// This is used to determine what state data is created or consumed by this node. -pub(super) trait Node { - /// All [`Nullifier`]s created by this node, **including** nullifiers for erased notes. This - /// may not be strictly necessary but it removes having to worry about reverting batches and - /// blocks with erased notes -- since these would otherwise have different state impact than - /// the transactions within them. - fn nullifiers(&self) -> Box + '_>; - - /// All output note commitments created by this node, **including** erased notes. This may not - /// be strictly necessary but it removes having to worry about reverting batches and blocks - /// with erased notes -- since these would otherwise have different state impact than the - /// transactions within them. - fn output_note_commitments(&self) -> Box + '_>; - fn unauthenticated_note_commitments(&self) -> Box + '_>; - /// The account state commitment updates caused by this node. - /// - /// Output tuple represents each updates `(account ID, initial commitment, final commitment)`. - /// - /// Updates must be aggregates i.e. only a single account ID update allowed. - fn account_updates(&self) -> Box + '_>; - fn transactions(&self) -> Box> + '_>; - - fn id(&self) -> NodeId; -} - -impl Node for TransactionNode { - fn nullifiers(&self) -> Box + '_> { - Box::new(self.0.nullifiers()) - } - - fn output_note_commitments(&self) -> Box + '_> { - Box::new(self.0.output_note_commitments()) - } - - fn unauthenticated_note_commitments(&self) -> Box + '_> { - Box::new(self.0.unauthenticated_note_commitments()) - } - - fn account_updates(&self) -> Box + '_> { - let update = self.0.account_update(); - Box::new(std::iter::once(( - update.account_id(), - update.initial_state_commitment(), - update.final_state_commitment(), - ))) - } - - fn transactions(&self) -> Box> + '_> { - Box::new(std::iter::once(&self.0)) - } - - fn id(&self) -> NodeId { - NodeId::Transaction(self.id()) - } -} - -impl Node for ProposedBatchNode { - fn nullifiers(&self) -> Box + '_> { - Box::new(self.0.txs().iter().flat_map(|tx| tx.nullifiers())) - } - - fn output_note_commitments(&self) -> Box + '_> { - Box::new(self.0.txs().iter().flat_map(|tx| tx.output_note_commitments())) - } - - fn unauthenticated_note_commitments(&self) -> Box + '_> { - Box::new(self.0.txs().iter().flat_map(|tx| tx.unauthenticated_note_commitments())) - } - - fn account_updates(&self) -> Box + '_> { - Box::new(self.0.account_updates()) - } - - fn transactions(&self) -> Box> + '_> { - Box::new(self.0.txs().iter()) - } - - fn id(&self) -> NodeId { - NodeId::ProposedBatch(self.0.id()) - } -} - -impl Node for ProvenBatchNode { - fn nullifiers(&self) -> Box + '_> { - Box::new( - self.tx_headers() - .flat_map(|tx| tx.input_notes().iter().map(InputNoteCommitment::nullifier)), - ) - } - - fn output_note_commitments(&self) -> Box + '_> { - Box::new( - self.tx_headers() - .flat_map(|tx| tx.output_notes().iter().map(NoteHeader::to_commitment)), - ) - } - - fn unauthenticated_note_commitments(&self) -> Box + '_> { - Box::new( - self.inner - .input_notes() - .iter() - .filter_map(|note| note.header()) - .map(NoteHeader::to_commitment), - ) - } - - fn account_updates(&self) -> Box + '_> { - Box::new(self.inner.account_updates().values().map(|update| { - ( - update.account_id(), - update.initial_state_commitment(), - update.final_state_commitment(), - ) - })) - } - - fn transactions(&self) -> Box> + '_> { - Box::new(self.txs.iter()) - } - - fn id(&self) -> NodeId { - NodeId::ProvenBatch(self.id()) - } -} - -impl Node for BlockNode { - fn nullifiers(&self) -> Box + '_> { - Box::new(self.txs.iter().flat_map(|tx| tx.nullifiers())) - } - - fn output_note_commitments(&self) -> Box + '_> { - Box::new( - self.txs - .iter() - .flat_map(|tx: &Arc| tx.output_note_commitments()), - ) - } - - fn unauthenticated_note_commitments(&self) -> Box + '_> { - Box::new(self.batches.iter().flat_map(|batch| { - batch - .input_notes() - .iter() - .filter_map(|note| note.header().map(NoteHeader::to_commitment)) - })) - } - - fn account_updates(&self) -> Box + '_> { - Box::new(self.account_updates.iter().map(|(account, (from, to))| (*account, *from, *to))) - } - - fn transactions(&self) -> Box> + '_> { - Box::new(self.txs.iter()) - } - - fn id(&self) -> NodeId { - NodeId::Block(self.number) - } -} - -/// Contains the current nodes of the state DAG. -/// -/// Nodes are purposefully not stored as a single collection since we often want to iterate -/// through specific node types e.g. all available transactions. -/// -/// This data _must_ be kept in sync with the [`InflightState's`] [`NodeIds`] since these are -/// used as the edges of the graph. -#[derive(Clone, Debug, PartialEq, Default)] -pub(super) struct Nodes { - // Nodes in the DAG - pub(super) txs: HashMap, - // user_batches: HashMap, - pub(super) proposed_batches: HashMap, - pub(super) proven_batches: HashMap, - pub(super) proposed_block: Option<(BlockNumber, BlockNode)>, - pub(super) committed_blocks: VecDeque<(BlockNumber, BlockNode)>, -} - -impl Nodes { - pub(super) fn oldest_committed_block(&self) -> Option { - self.committed_blocks.front().map(|(number, _)| *number) - } - - pub(super) fn inject_telemetry(&self, span: &tracing::Span) { - use miden_node_utils::tracing::OpenTelemetrySpanExt; - - span.set_attribute("mempool.transactions.uncommitted", self.uncommitted_tx_count()); - span.set_attribute("mempool.transactions.unbatched", self.txs.len()); - span.set_attribute("mempool.batches.proposed", self.proposed_batches.len()); - span.set_attribute("mempool.batches.proven", self.proven_batches.len()); - } - - pub(super) fn uncommitted_tx_count(&self) -> usize { - self.txs.len() - + self.proposed_batches.values().map(|b| b.0.txs().len()).sum::() - + self.proven_batches.values().map(|b| b.txs.len()).sum::() - + self.proposed_block.as_ref().map(|b| b.1.txs.len()).unwrap_or_default() - } -} - -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - - use miden_protocol::batch::BatchAccountUpdate; - use miden_protocol::transaction::{InputNotes, OrderedTransactionHeaders}; - - use super::*; - use crate::test_utils::MockProvenTxBuilder; - - #[test] - fn proposed_batch_aggregates_account_updates() { - let mut batch = SelectedBatch::builder(); - let txs = MockProvenTxBuilder::sequential(); - - let account = txs.first().unwrap().account_id(); - let from = txs.first().unwrap().account_update().initial_state_commitment(); - let to = txs.last().unwrap().account_update().final_state_commitment(); - let expected = std::iter::once((account, from, to)); - - for tx in txs { - batch.push(tx); - } - let batch = ProposedBatchNode::new(batch.build()); - - itertools::assert_equal(batch.account_updates(), expected); - } - - #[test] - fn block_aggregates_account_updates() { - // We map each tx into its own batch. - // - // This let's us trivially know what the expected aggregate block account update should be. - let txs = MockProvenTxBuilder::sequential(); - let account = txs.first().unwrap().account_id(); - let from = txs.first().unwrap().account_update().initial_state_commitment(); - let to = txs.last().unwrap().account_update().final_state_commitment(); - let expected = std::iter::once((account, from, to)); - - let mut block = BlockNode::new(BlockNumber::default()); - - for tx in txs { - let mut batch = SelectedBatch::builder(); - batch.push(tx.clone()); - let batch = batch.build(); - let batch = ProposedBatchNode::new(batch); - - let account_update = BatchAccountUpdate::from_transaction(tx.raw_proven_transaction()); - - let tx_header = TransactionHeader::from(tx.raw_proven_transaction()); - let proven_batch = ProvenBatch::new( - batch.batch_id(), - Word::default(), - BlockNumber::default(), - BTreeMap::from([(account_update.account_id(), account_update)]), - InputNotes::default(), - Vec::default(), - BlockNumber::MAX, - OrderedTransactionHeaders::new_unchecked(vec![tx_header]), - ) - .unwrap(); - - let batch = batch.into_proven_batch_node(Arc::new(proven_batch)); - block.push(batch); - } - - itertools::assert_equal(block.account_updates(), expected); - } -} diff --git a/crates/block-producer/src/mempool/state.rs b/crates/block-producer/src/mempool/state.rs deleted file mode 100644 index 93c16f6b6c..0000000000 --- a/crates/block-producer/src/mempool/state.rs +++ /dev/null @@ -1,333 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; - -use miden_protocol::Word; -use miden_protocol::account::AccountId; -use miden_protocol::note::Nullifier; - -use crate::mempool::nodes::{Node, NodeId}; - -/// Tracks the inflight state of the mempool and the [`NodeId`]s associated with each piece of it. -/// -/// This allows it to track the dependency relationships between nodes in the mempool's state DAG by -/// checking which [`NodeId`]s created or consumed the data the node relies on. -/// -/// Note that the user is responsible for ensuring that the inserted nodes adhere to a DAG -/// structure. No attempt is made to verify this internally here as it requires more information -/// than is available at this level. -#[derive(Clone, Debug, PartialEq, Default)] -pub(super) struct InflightState { - /// All nullifiers created by inflight state. - /// - /// This _includes_ nullifiers from erased notes to simplify reverting nodes and requeuing - /// their transactions. If this weren't the case, then its possible that a batch contains an - /// erased note which can clash with another inflight transaction. If we were to revert this - /// batch and requeue its transactions, then this would be illegal. This is possible to handle - /// but would be painful ito bookkeeping. - /// - /// Instead we opt to include erased notes in the tracking here, which allows us to revert the - /// batch and requeue its transactions without having to worry about note clashes. - nullifiers: HashSet, - - /// All created notes and the ID of the node that created it. - /// - /// This _includes_ erased notes, see `nullifiers` for more information. - output_notes: HashMap, - - /// Maps all unauthenticated notes to the node ID that consumed them. - /// - /// This can be combined with `output_notes` to infer a parent<->child relationship between - /// nodes i.e. the parent node that created a note and the child node that consumed the note. - /// - /// It is the callers responsibility to ensure that all unauthenticated notes exist as output - /// notes at the time when a new transaction or batch node is inserted. - unauthenticated_notes: HashMap, - - /// All inflight account commitment transitions. - accounts: HashMap, -} - -impl InflightState { - /// Returns all nullifiers which already exist. - pub(super) fn nullifiers_exist( - &self, - nullifiers: impl Iterator, - ) -> Vec { - nullifiers.filter(|nullifier| self.nullifiers.contains(nullifier)).collect() - } - - /// Returns all output note commitments which already exist. - pub(super) fn output_notes_exist(&self, notes: impl Iterator) -> Vec { - notes - .filter(|note_commitment| self.output_notes.contains_key(note_commitment)) - .collect() - } - - /// Returns all output notes which don't exist. - pub(super) fn output_notes_missing( - &self, - note_commitments: impl Iterator, - ) -> Vec { - note_commitments.filter(|note| !self.output_notes.contains_key(note)).collect() - } - - /// The latest account commitment tracked by the inflight state. - /// - /// A [`None`] value _does not_ mean this account doesn't exist at all, but rather that it - /// has no inflight nodes. - pub(super) fn account_commitment(&self, account: &AccountId) -> Option { - self.accounts.get(account).map(AccountUpdates::latest_commitment) - } - - /// Removes all the state of the node from tracking. - /// - /// Note that this simply removes the data and does not check that the data was associated with - /// the [`Node`] at the time of removal. The caller is responsible for ensuring that the given - /// node was still active in the state. - pub(super) fn remove(&mut self, node: &dyn Node) { - for nullifier in node.nullifiers() { - assert!( - self.nullifiers.remove(&nullifier), - "Nullifier {nullifier} was not present for removal" - ); - } - - for note in node.output_note_commitments() { - assert!( - self.output_notes.remove(¬e).is_some(), - "Output note {note} was not present for removal" - ); - } - - for note in node.unauthenticated_note_commitments() { - assert!( - self.unauthenticated_notes.remove(¬e).is_some(), - "Unauthenticated note {note} was not present for removal" - ); - } - - for (account, from, to) in node.account_updates() { - let Entry::Occupied(entry) = self - .accounts - .entry(account) - .and_modify(|entry| entry.remove(node.id(), from, to)) - else { - panic!("Account {account} update ({from} -> {to}) was not present for removal"); - }; - - if entry.get().is_empty() { - entry.remove_entry(); - } - } - } - - /// Inserts the node into the state, associating the data with the node's ID. This powers the - /// parent and child relationship lookups. - pub(super) fn insert(&mut self, id: NodeId, node: &dyn Node) { - self.nullifiers.extend(node.nullifiers()); - self.output_notes - .extend(node.output_note_commitments().map(|note_commitment| (note_commitment, id))); - self.unauthenticated_notes.extend( - node.unauthenticated_note_commitments() - .map(|note_commitment| (note_commitment, id)), - ); - - for (account, from, to) in node.account_updates() { - self.accounts.entry(account).or_default().insert(id, from, to); - } - } - - /// The [`NodeId`]s which the given node directly depends on. - /// - /// Note that the result is invalidated by mutating the state. - pub(super) fn parents(&self, id: NodeId, node: &dyn Node) -> HashSet { - let note_parents = node - .unauthenticated_note_commitments() - .filter_map(|note| self.output_notes.get(¬e)); - - let account_parents = node - .account_updates() - .filter_map(|(account, from, to)| { - self.accounts.get(&account).map(|account| account.parents(from, to)) - }) - .flatten(); - - account_parents - .chain(note_parents) - .copied() - // Its possible for a node to have internal state connecting to itself. For example, - // a proposed batch has not erased the internally produced and consumed notes. - .filter(|parent| parent != &id) - .collect() - } - - /// The [`NodeId`]s which depend directly on the given node. - /// - /// Note that the result is invalidated by mutating the state. - pub(super) fn children(&self, id: NodeId, node: &dyn Node) -> HashSet { - let note_children = node - .output_note_commitments() - .filter_map(|note| self.unauthenticated_notes.get(¬e)); - - let account_children = node - .account_updates() - .filter_map(|(account, from, to)| { - self.accounts.get(&account).map(|account| account.children(from, to)) - }) - .flatten(); - - account_children - .chain(note_children) - .copied() - // Its possible for a node to have internal state connecting to itself. For example, - // a proposed batch has not erased the internally produced and consumed notes. - .filter(|child| child != &id) - .collect() - } - - pub(super) fn inject_telemetry(&self, span: &tracing::Span) { - use miden_node_utils::tracing::OpenTelemetrySpanExt; - - span.set_attribute("mempool.accounts", self.accounts.len()); - span.set_attribute("mempool.nullifiers", self.nullifiers.len()); - span.set_attribute("mempool.output_notes", self.output_notes.len()); - } -} - -/// The commitment updates made to a account. -#[derive(Clone, Debug, PartialEq, Default)] -struct AccountUpdates { - from: HashMap, - to: HashMap, - /// This holds updates from nodes where the initial commitment is the same as the final - /// commitment aka no actual change was made to the account. - /// - /// This sounds counter-intuitive, but is caused by so-called pass-through transactions which - /// use an account at some state `A` but only consume and emit notes without changing the - /// account state itself. - /// - /// These still need to be tracked as part of account updates since they require that an - /// account is in the given state. Since we want these node's to be processed before the - /// account state is changed, this implies that they must be considered children of the - /// non-pass-through node that created the state. Similarly, they must be considered - /// parents of any non-pass-through node that changes to another state as otherwise this - /// node might be processed before the pass-through nodes are. - /// - /// Pass-through nodes with the same state are considered siblings of each as they don't - /// actually depend on each other, and may be processed in any order. - /// - /// Note also that its possible for any node's updates to an account to solely consist of - /// pass-through transactions and therefore in turn is a pass-through node from the perspective - /// of that account. - pass_through: HashMap>, -} - -impl AccountUpdates { - fn latest_commitment(&self) -> Word { - // The latest commitment will be whichever commitment isn't consumed aka a `to` which has - // no `from`. This breaks if this isn't in a valid state. - self.to - .keys() - .find(|commitment| !self.from.contains_key(commitment)) - .or(self.pass_through.keys().next()) - .copied() - .unwrap_or_default() - } - - fn is_empty(&self) -> bool { - self.from.is_empty() && self.to.is_empty() && self.pass_through.is_empty() - } - - fn remove(&mut self, id: NodeId, from: Word, to: Word) { - if from == to { - let entry = self.pass_through.entry(from).or_default(); - assert!( - entry.remove(&id), - "Account pass through commitment removal of {from} for {id:?} does not exist" - ); - if entry.is_empty() { - self.pass_through.remove(&from); - } - } else { - let from_removed = self - .from - .remove(&from) - .expect("should only be removing account updates from nodes that are present"); - let to_removed = self - .to - .remove(&to) - .expect("should only be removing account updates from nodes that are present"); - assert_eq!( - from_removed, to_removed, - "Account updates should be removed as a pair with the same node ID" - ); - assert_eq!(from_removed, id, "Account update removal should match the input node ID",); - } - } - - fn insert(&mut self, id: NodeId, from: Word, to: Word) { - if from == to { - assert!( - self.pass_through.entry(from).or_default().insert(id), - "Account already contained the pass through commitment {from} for node {id:?}" - ); - } else { - assert!( - self.from.insert(from, id).is_none(), - "Account already contained the commitment {from} when inserting {id:?}" - ); - assert!( - self.to.insert(to, id).is_none(), - "Account already contained the commitment {to} when inserting {id:?}" - ); - } - } - - /// Returns the node IDs that updated this account's commitment to the given value. - /// - /// Note that this might be multiple IDs due to pass through transactions. When the input - /// is itself a pass through transaction (`from == to`), then its sibling pass through - /// transactions are not considered parents as they are siblings. - /// - /// In other words, this returns the IDs of `node` where `node.to == from`. This infers the - /// parent-child relationship where `parent.to == child.from`. - fn parents(&self, from: Word, to: Word) -> impl Iterator { - let direct_parent = self.to.get(&from).into_iter(); - - // If the node query isn't for a pass-through node, then it must also consider pass-through - // nodes at its `from` commitment as parents. - // - // This means the query node depends on the pass-through nodes since these must be processed - // before the account commitment may change. - let pass_through_parents = (from != to) - .then(|| self.pass_through.get(&from).map(HashSet::iter)) - .flatten() - .unwrap_or_default(); - - direct_parent.chain(pass_through_parents) - } - - /// Returns the node ID that consumed the given commitment. - /// - /// Note that this might be multiple IDs due to pass through transactions. When the input - /// is itself a pass through transaction (`from == to`), then its sibling pass through - /// transactions are not considered children as they are siblings. - /// - /// In other words, this returns the ID of `node` where `node.from == to`. This infers the - /// parent-child relationship where `parent.to == child.from`. - fn children(&self, from: Word, to: Word) -> impl Iterator { - let direct_child = self.from.get(&to).into_iter(); - - // If the node query isn't for a pass-through node, then it must also consider pass-through - // nodes at its `to` commitment as children. - // - // This means the pass-through nodes depend on the query node since it changes the account - // commitment to the state required by the pass-through nodes. - let pass_through_children = (from != to) - .then(|| self.pass_through.get(&to).map(HashSet::iter)) - .flatten() - .unwrap_or_default(); - - direct_child.chain(pass_through_children) - } -} diff --git a/crates/block-producer/src/mempool/subscription.rs b/crates/block-producer/src/mempool/subscription.rs index 1da3bf03fe..01eaa3fccb 100644 --- a/crates/block-producer/src/mempool/subscription.rs +++ b/crates/block-producer/src/mempool/subscription.rs @@ -9,6 +9,10 @@ use tokio::sync::mpsc; use crate::domain::transaction::AuthenticatedTransaction; +/// Coordinates mempool event delivery for a single subscriber. +/// +/// Retains the active subscriber channel (if any) and an in-memory queue of uncommitted +/// transaction events so new subscriptions can immediately replay pending updates. #[derive(Clone, Debug)] pub(crate) struct SubscriptionProvider { /// The latest event subscription, if any. @@ -24,13 +28,19 @@ pub(crate) struct SubscriptionProvider { /// Tracks all uncommitted transaction events. These events must be resent on start /// of a new subscription since the subscriber will only have data up to the latest - /// committed block and would otherwise miss these uncommiited transactions. + /// committed block and would otherwise miss these uncommitted transactions. /// /// The size is bounded by removing events as they are committed or reverted, and as /// such this is always bound to the current amount of inflight transactions. inflight_txs: InflightTransactions, } +impl PartialEq for SubscriptionProvider { + fn eq(&self, other: &Self) -> bool { + self.chain_tip == other.chain_tip && self.inflight_txs == other.inflight_txs + } +} + impl SubscriptionProvider { pub fn new(chain_tip: BlockNumber) -> Self { Self { @@ -43,6 +53,7 @@ impl SubscriptionProvider { /// Creates a new [`MempoolEvent`] subscription. /// /// This replaces any existing subscription. + /// Any previous subscriber is dropped and must resubscribe to continue receiving events. pub fn subscribe(&mut self) -> mpsc::Receiver { // We should leave enough space to at least send the uncommitted events (plus some extra). let capacity = self.inflight_txs.len().mul(2).max(1024); @@ -62,6 +73,8 @@ impl SubscriptionProvider { rx } + /// Records a newly added transaction in the inflight queue and forwards the event to the + /// subscriber. pub(super) fn transaction_added(&mut self, tx: &AuthenticatedTransaction) { let id = tx.id(); let nullifiers = tx.nullifiers().collect(); @@ -87,6 +100,8 @@ impl SubscriptionProvider { Self::send_event(&mut self.subscription, event); } + /// Records a committed block, prunes replayed transactions, and forwards the event so future + /// subscribers continue from the latest chain tip. pub(super) fn block_committed(&mut self, header: BlockHeader, txs: Vec) { self.chain_tip = header.block_num(); for tx in &txs { @@ -99,6 +114,8 @@ impl SubscriptionProvider { ); } + /// Removes reverted transactions from the inflight queue and notifies the subscriber so they + /// can drop or retry the affected items. pub(super) fn txs_reverted(&mut self, txs: HashSet) { for tx in &txs { self.inflight_txs.remove(tx); @@ -108,7 +125,8 @@ impl SubscriptionProvider { /// Sends a [`MempoolEvent`] to the subscriber, if any. /// - /// If the send fails, then the subscription is cancelled. + /// If the send fails, the subscription is cancelled and the event is dropped, so callers must + /// resubscribe to continue receiving updates. /// /// This function does not take `&self` to work-around borrowing issues /// where both the sender and inflight events need to be borrowed at the same time. @@ -131,7 +149,7 @@ impl SubscriptionProvider { /// This is used to track events which need to be sent on fresh subscriptions. /// /// The events can be iterated over in chronological order. -#[derive(Default, Clone, Debug)] +#[derive(Default, Clone, Debug, PartialEq)] struct InflightTransactions { /// [`MempoolEvent::TransactionAdded`] events which are still inflight i.e. have not been /// committed or reverted. diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index 5cafd0137d..bda75e291c 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -136,19 +136,20 @@ fn block_commit_reverts_expired_txns() { uut.commit_batch(Arc::new(ProvenBatch::mocked_from_transactions([ tx_to_commit.raw_proven_transaction() ]))); - let (block, _) = uut.select_block(); + let block = uut.select_block(); // A reverted transaction behaves as if it never existed, the current state is the expected // outcome, plus an extra committed block at the end. let mut reference = uut.clone(); // Add a new transaction which will expire when the pending block is committed. - let tx_to_revert = - MockProvenTxBuilder::with_account_index(1).expiration_block_num(block).build(); + let tx_to_revert = MockProvenTxBuilder::with_account_index(1) + .expiration_block_num(block.block_number) + .build(); let tx_to_revert = Arc::new(AuthenticatedTransaction::from_inner(tx_to_revert)); uut.add_transaction(tx_to_revert).unwrap(); // Commit the pending block which should revert the above tx. - let arb_header = BlockHeader::mock(block, None, None, &[], Word::empty()); + let arb_header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); uut.commit_block(arb_header.clone()); reference.commit_block(arb_header); @@ -160,8 +161,8 @@ fn empty_block_commitment() { let (mut uut, _) = Mempool::for_tests(); for _ in 0..3 { - let (number, _) = uut.select_block(); - let arb_header = BlockHeader::mock(number, None, None, &[], Word::empty()); + let block = uut.select_block(); + let arb_header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); uut.commit_block(arb_header); } } @@ -182,6 +183,28 @@ fn cannot_have_multiple_inflight_blocks() { uut.select_block(); } +/// This ensures we've guarded against a batch being marked as proven and then rolled back. +/// +/// This shouldn't be possible in a well behaving system, but if a bug leads to this outcome, +/// then yanking a previously proven batch could result in mempool corruption (since the batch +/// could be in a block). +#[test] +fn rollbacks_of_already_proven_batches_are_ignored() { + let txs = MockProvenTxBuilder::sequential(); + + let (mut uut, _) = Mempool::for_tests(); + uut.add_transaction(txs[0].clone()).unwrap(); + let batch = uut.select_batch().unwrap(); + + let proof = Arc::new(ProvenBatch::mocked_from_transactions([txs[0].raw_proven_transaction()])); + uut.commit_batch(Arc::clone(&proof)); + let reference = uut.clone(); + + uut.rollback_batch(batch.id()); + + assert_eq!(uut, reference); +} + // BLOCK FAILED TESTS // ================================================================================================ @@ -200,7 +223,7 @@ fn block_failure_reverts_its_transactions() { ]))); // Block 1 will contain just the first batch. - let (number, _batches) = uut.select_block(); + let block = uut.select_block(); // Create another dependent batch. uut.add_transaction(reverted_txs[1].clone()).unwrap(); @@ -209,31 +232,7 @@ fn block_failure_reverts_its_transactions() { uut.add_transaction(reverted_txs[2].clone()).unwrap(); // Fail the block which should result in everything reverting. - uut.rollback_block(number); - - assert_eq!(uut, reference); -} - -/// Ensures that reverting a subtree removes the node and all its descendents. We test this by -/// comparing against a reference mempool that never had the subtree inserted at all. -#[test] -fn subtree_reversion_removes_all_descendents() { - let (mut uut, mut reference) = Mempool::for_tests(); - - let reverted_txs = MockProvenTxBuilder::sequential(); - - uut.add_transaction(reverted_txs[0].clone()).unwrap(); - uut.select_batch().unwrap(); - - uut.add_transaction(reverted_txs[1].clone()).unwrap(); - let to_revert = uut.select_batch().unwrap(); - - uut.add_transaction(reverted_txs[2].clone()).unwrap(); - uut.revert_subtree(NodeId::ProposedBatch(to_revert.id())); - - // We expect the second batch and the latter reverted txns to be non-existent. - reference.add_transaction(reverted_txs[0].clone()).unwrap(); - reference.select_batch().unwrap(); + uut.rollback_block(block.block_number); assert_eq!(uut, reference); } @@ -307,6 +306,7 @@ fn pass_through_txs_on_an_empty_account() { account_update.account_id(), account_update.initial_state_commitment(), account_update.final_state_commitment(), + tx_pass_through_a.store_account_state(), )); itertools::assert_equal(batch.account_updates(), expected); diff --git a/crates/block-producer/src/mempool/tests/add_transaction.rs b/crates/block-producer/src/mempool/tests/add_transaction.rs index 1fc611e4ef..3f45a28438 100644 --- a/crates/block-producer/src/mempool/tests/add_transaction.rs +++ b/crates/block-producer/src/mempool/tests/add_transaction.rs @@ -5,7 +5,7 @@ use miden_protocol::Word; use miden_protocol::block::BlockHeader; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::AddTransactionError; +use crate::errors::{AddTransactionError, StateConflict}; use crate::mempool::Mempool; use crate::test_utils::{MockProvenTxBuilder, mock_account_id}; @@ -68,8 +68,8 @@ mod tx_expiration { // Create at least some locally retained state. let slack = uut.config.expiration_slack; for _ in 0..slack + 10 { - let (number, _) = uut.select_block(); - let header = BlockHeader::mock(number, None, None, &[], Word::default()); + let block = uut.select_block(); + let header = BlockHeader::mock(block.block_number, None, None, &[], Word::default()); uut.commit_block(header); } @@ -140,8 +140,8 @@ mod authentication_height { // Create at least some locally retained state. let retention = uut.config.state_retention.get(); for _ in 0..retention + 10 { - let (number, _) = uut.select_block(); - let header = BlockHeader::mock(number, None, None, &[], Word::default()); + let block = uut.select_block(); + let header = BlockHeader::mock(block.block_number, None, None, &[], Word::default()); uut.commit_block(header); } @@ -233,7 +233,10 @@ fn duplicate_nullifiers_are_rejected() { let result = uut.add_transaction(tx_b); // We overlap with one nullifier. - assert_matches!(result, Err(AddTransactionError::InputNotesAlreadyConsumed(..))); + assert_matches!( + result, + Err(AddTransactionError::StateConflict(StateConflict::NullifiersAlreadyExist(..))) + ); } #[test] @@ -263,7 +266,10 @@ fn duplicate_output_notes_are_rejected() { uut.add_transaction(tx_a).unwrap(); let result = uut.add_transaction(tx_b); - assert_matches!(result, Err(AddTransactionError::OutputNotesAlreadyExist(..))); + assert_matches!( + result, + Err(AddTransactionError::StateConflict(StateConflict::OutputNotesAlreadyExist(..))) + ); } #[test] @@ -293,7 +299,12 @@ fn unknown_unauthenticated_notes_are_rejected() { uut.add_transaction(tx_a).unwrap(); let result = uut.add_transaction(tx_b); - assert_matches!(result, Err(AddTransactionError::UnauthenticatedNotesNotFound(..))); + assert_matches!( + result, + Err(AddTransactionError::StateConflict(StateConflict::UnauthenticatedNotesMissing( + .. + ))) + ); } mod account_state { @@ -390,7 +401,12 @@ mod account_state { uut.add_transaction(tx_a).unwrap(); let result = uut.add_transaction(tx_b); - assert_matches!(result, Err(AddTransactionError::IncorrectAccountInitialCommitment { .. })); + assert_matches!( + result, + Err(AddTransactionError::StateConflict( + StateConflict::AccountCommitmentMismatch { .. } + )) + ); } /// Ensures that store state is checked if there is no local mempool state. @@ -415,12 +431,18 @@ mod account_state { let tx = Arc::new(tx); let result = uut.add_transaction(tx); - assert_matches!(result, Err(AddTransactionError::IncorrectAccountInitialCommitment { .. })); + assert_matches!( + result, + Err(AddTransactionError::StateConflict( + StateConflict::AccountCommitmentMismatch { .. } + )) + ); } } mod new_account { use super::*; + use crate::errors::StateConflict; #[test] fn is_valid() { @@ -459,7 +481,12 @@ mod new_account { ])); let tx = Arc::new(tx); let result = uut.add_transaction(tx); - assert_matches!(result, Err(AddTransactionError::IncorrectAccountInitialCommitment { .. })); + assert_matches!( + result, + Err(AddTransactionError::StateConflict( + StateConflict::AccountCommitmentMismatch { .. } + )) + ); } #[test] @@ -480,6 +507,11 @@ mod new_account { uut.add_transaction(tx.clone()).unwrap(); let result = uut.add_transaction(tx); - assert_matches!(result, Err(AddTransactionError::IncorrectAccountInitialCommitment { .. })); + assert_matches!( + result, + Err(AddTransactionError::StateConflict( + StateConflict::AccountCommitmentMismatch { .. } + )) + ); } } diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 7392a15412..98e642338a 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -29,13 +29,7 @@ use url::Url; use crate::batch_builder::BatchBuilder; use crate::block_builder::BlockBuilder; use crate::domain::transaction::AuthenticatedTransaction; -use crate::errors::{ - AddTransactionError, - BlockProducerError, - StoreError, - SubmitProvenBatchError, - VerifyTxError, -}; +use crate::errors::{AddTransactionError, BlockProducerError, StoreError, SubmitProvenBatchError}; use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool}; use crate::store::StoreClient; use crate::validator::BlockProducerValidatorClient; @@ -331,10 +325,16 @@ impl BlockProducerRpcServer { ); debug!(target: COMPONENT, proof = ?tx.proof()); - let inputs = self.store.get_tx_inputs(&tx).await.map_err(VerifyTxError::from)?; + let inputs = self + .store + .get_tx_inputs(&tx) + .await + .map_err(AddTransactionError::StoreConnectionFailed)?; // SAFETY: we assume that the rpc component has verified the transaction proof already. - let tx = AuthenticatedTransaction::new_unchecked(tx, inputs).map(Arc::new)?; + let tx = AuthenticatedTransaction::new_unchecked(tx, inputs) + .map(Arc::new) + .map_err(AddTransactionError::StateConflict)?; self.mempool .lock() diff --git a/crates/proto/src/domain/account.rs b/crates/proto/src/domain/account.rs index 3772241c95..64f52d1a36 100644 --- a/crates/proto/src/domain/account.rs +++ b/crates/proto/src/domain/account.rs @@ -515,11 +515,8 @@ impl TryFrom value: proto::rpc::account_storage_details::AccountStorageMapDetails, ) -> Result { use proto::rpc::account_storage_details::account_storage_map_details::{ - all_map_entries::StorageMapEntry, - map_entries_with_proofs::StorageMapEntryWithProof, - AllMapEntries, - Entries as ProtoEntries, - MapEntriesWithProofs, + AllMapEntries, Entries as ProtoEntries, MapEntriesWithProofs, + all_map_entries::StorageMapEntry, map_entries_with_proofs::StorageMapEntryWithProof, }; let proto::rpc::account_storage_details::AccountStorageMapDetails { diff --git a/crates/proto/src/domain/mempool.rs b/crates/proto/src/domain/mempool.rs index e6b01bd61c..12eba0d1b0 100644 --- a/crates/proto/src/domain/mempool.rs +++ b/crates/proto/src/domain/mempool.rs @@ -10,7 +10,7 @@ use miden_standards::note::AccountTargetNetworkNote; use crate::errors::{ConversionError, MissingFieldHelper}; use crate::generated as proto; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum MempoolEvent { TransactionAdded { id: TransactionId,