From b050937a59a9f2b83f9dea933df343845d24ef0a Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 6 May 2026 15:43:27 +0000 Subject: [PATCH] fix: source invalidations from buffered best txs --- crates/payload/builder/src/lib.rs | 23 +-- crates/transaction-pool/src/best.rs | 184 ++++++++++++++++++++-- crates/transaction-pool/src/lib.rs | 2 +- crates/transaction-pool/src/tempo_pool.rs | 20 ++- 4 files changed, 200 insertions(+), 29 deletions(-) diff --git a/crates/payload/builder/src/lib.rs b/crates/payload/builder/src/lib.rs index 1e78f729af..3544828f9c 100644 --- a/crates/payload/builder/src/lib.rs +++ b/crates/payload/builder/src/lib.rs @@ -29,8 +29,7 @@ use reth_primitives_traits::{Recovered, transaction::error::InvalidTransactionEr use reth_revm::{State, context::Block, database::StateProviderDatabase}; use reth_storage_api::{StateProvider, StateProviderFactory}; use reth_transaction_pool::{ - BestTransactions, BestTransactionsAttributes, TransactionPool, ValidPoolTransaction, - error::InvalidPoolTransactionError, + BestTransactionsAttributes, ValidPoolTransaction, error::InvalidPoolTransactionError, }; use std::{ sync::{ @@ -53,7 +52,7 @@ use tempo_primitives::{ }, }; use tempo_transaction_pool::{ - StateAwareBestTransactions, TempoTransactionPool, + SourcedBestTransactions, StateAwareBestTransactions, TempoTransactionPool, transaction::{TempoPoolTransactionError, TempoPooledTransaction}, }; use tracing::{Level, debug, debug_span, error, info, instrument, trace, warn}; @@ -177,7 +176,7 @@ where ) -> Result, PayloadBuilderError> { self.build_payload( args, - |attributes| self.pool.best_transactions_with_attributes(attributes), + |attributes| self.pool.best_transactions_with_source(attributes), false, ) } @@ -230,7 +229,7 @@ where empty: bool, ) -> Result, PayloadBuilderError> where - Txs: BestTransactions>>, + Txs: SourcedBestTransactions>>, { let BuildArguments { mut cached_reads, @@ -445,7 +444,7 @@ where check_cancel!(); - let Some(pool_tx) = best_txs.next() else { + let Some((pool_tx, pool_tx_source)) = best_txs.next_with_source() else { if build_until_interrupt && cumulative_gas_used < non_shared_gas_limit { std::thread::sleep(Duration::from_millis(1)); continue; @@ -464,12 +463,13 @@ where if cumulative_gas_used + max_regular_gas_used > non_shared_gas_limit { // Mark this transaction as invalid since it doesn't fit // The iterator will handle lane switching internally when appropriate - best_txs.mark_invalid( + best_txs.mark_invalid_with_source( &pool_tx, &InvalidPoolTransactionError::ExceedsGasLimit( pool_tx.gas_limit(), non_shared_gas_limit - cumulative_gas_used, ), + pool_tx_source, ); self.metrics .inc_pool_tx_skipped("exceeds_non_shared_gas_limit"); @@ -481,11 +481,12 @@ where if !pool_tx.transaction.is_payment() && non_payment_gas_used + max_regular_gas_used > general_gas_limit { - best_txs.mark_invalid( + best_txs.mark_invalid_with_source( &pool_tx, &InvalidPoolTransactionError::Other(Box::new( TempoPoolTransactionError::ExceedsNonPaymentLimit, )), + pool_tx_source, ); self.metrics .inc_pool_tx_skipped("exceeds_general_gas_limit"); @@ -507,12 +508,13 @@ where let estimated_block_size_with_tx = block_size_used + tx_rlp_length; if is_osaka && estimated_block_size_with_tx > MAX_RLP_BLOCK_SIZE { - best_txs.mark_invalid( + best_txs.mark_invalid_with_source( &pool_tx, &InvalidPoolTransactionError::OversizedData { size: estimated_block_size_with_tx, limit: MAX_RLP_BLOCK_SIZE, }, + pool_tx_source, ); self.metrics.inc_pool_tx_skipped("oversized_block"); continue; @@ -573,11 +575,12 @@ where // if the transaction is invalid, we can skip it and all of its // descendants trace!(%error, tx = %tx_debug_repr, "skipping invalid transaction and its descendants"); - best_txs.mark_invalid( + best_txs.mark_invalid_with_source( &pool_tx, &InvalidPoolTransactionError::Consensus( InvalidTransactionError::TxTypeNotSupported, ), + pool_tx_source, ); self.metrics.inc_pool_tx_skipped("invalid_tx"); } diff --git a/crates/transaction-pool/src/best.rs b/crates/transaction-pool/src/best.rs index 3f80215ab6..e16048c4c3 100644 --- a/crates/transaction-pool/src/best.rs +++ b/crates/transaction-pool/src/best.rs @@ -33,13 +33,52 @@ impl BestPriorityTransactions> } } -/// Tracks which side of a [`MergeBestTransactions`] yielded the last transaction. -#[derive(Debug, Clone, Copy)] -enum MergeSource { +/// Tracks which side of a [`MergeBestTransactions`] yielded a transaction. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BestTransactionSource { + /// The transaction came from the left iterator. Left, + /// The transaction came from the right iterator. Right, } +/// A [`BestTransactions`] extension for callers that need to retain the source +/// of a transaction across buffered consumption. +pub trait SourcedBestTransactions: BestTransactions { + /// Source identifier returned with each transaction. + type Source: Copy; + + /// Returns the next best transaction and the source iterator that yielded it. + fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)>; + + /// Marks a transaction as invalid in the provided source iterator. + fn mark_invalid_with_source( + &mut self, + transaction: &Self::Item, + kind: &InvalidPoolTransactionError, + source: Self::Source, + ); +} + +impl SourcedBestTransactions for core::iter::Empty +where + core::iter::Empty: BestTransactions, +{ + type Source = (); + + fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> { + None + } + + fn mark_invalid_with_source( + &mut self, + _transaction: &Self::Item, + _kind: &InvalidPoolTransactionError, + _source: Self::Source, + ) { + } +} + /// A [`BestTransactions`] iterator that merges two individual implementations and always yields the next best item from either of the iterators. pub struct MergeBestTransactions where @@ -51,7 +90,7 @@ where right: R, next_left: Option<(L::Item, Priority)>, next_right: Option<(L::Item, Priority)>, - last_source: Option, + last_source: Option, } impl MergeBestTransactions @@ -79,7 +118,9 @@ where T: TransactionOrdering, { /// Returns the next transaction from either the left or the right iterator with the higher priority. - fn next_best(&mut self) -> Option<(L::Item, Priority)> { + fn next_best( + &mut self, + ) -> Option<(L::Item, Priority, BestTransactionSource)> { if self.next_left.is_none() { self.next_left = self.left.next_tx_and_priority(); } @@ -94,31 +135,43 @@ where } // Only left has an item - take it (Some(_), None) => { - self.last_source = Some(MergeSource::Left); + self.last_source = Some(BestTransactionSource::Left); let (item, priority) = self.next_left.take()?; - Some((item, priority)) + Some((item, priority, BestTransactionSource::Left)) } // Only right has an item - take it (None, Some(_)) => { - self.last_source = Some(MergeSource::Right); + self.last_source = Some(BestTransactionSource::Right); let (item, priority) = self.next_right.take()?; - Some((item, priority)) + Some((item, priority, BestTransactionSource::Right)) } // Both sides have items - compare priorities and take the higher one (Some((_, left_priority)), Some((_, right_priority))) => { // Higher priority value is better if left_priority >= right_priority { - self.last_source = Some(MergeSource::Left); + self.last_source = Some(BestTransactionSource::Left); let (item, priority) = self.next_left.take()?; - Some((item, priority)) + Some((item, priority, BestTransactionSource::Left)) } else { - self.last_source = Some(MergeSource::Right); + self.last_source = Some(BestTransactionSource::Right); let (item, priority) = self.next_right.take()?; - Some((item, priority)) + Some((item, priority, BestTransactionSource::Right)) } } } } + + fn mark_invalid_from_source( + &mut self, + transaction: &L::Item, + kind: &InvalidPoolTransactionError, + source: BestTransactionSource, + ) { + match source { + BestTransactionSource::Left => self.left.mark_invalid(transaction, kind), + BestTransactionSource::Right => self.right.mark_invalid(transaction, kind), + } + } } impl Iterator for MergeBestTransactions @@ -130,7 +183,29 @@ where type Item = L::Item; fn next(&mut self) -> Option { - self.next_best().map(|(tx, _)| tx) + self.next_best().map(|(tx, _, _)| tx) + } +} + +impl SourcedBestTransactions for MergeBestTransactions +where + L: BestPriorityTransactions + Send, + R: BestPriorityTransactions + Send, + T: TransactionOrdering, +{ + type Source = BestTransactionSource; + + fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> { + self.next_best().map(|(tx, _, source)| (tx, source)) + } + + fn mark_invalid_with_source( + &mut self, + transaction: &Self::Item, + kind: &InvalidPoolTransactionError, + source: Self::Source, + ) { + self.mark_invalid_from_source(transaction, kind, source); } } @@ -142,8 +217,7 @@ where { fn mark_invalid(&mut self, transaction: &Self::Item, kind: &InvalidPoolTransactionError) { match self.last_source { - Some(MergeSource::Left) => self.left.mark_invalid(transaction, kind), - Some(MergeSource::Right) => self.right.mark_invalid(transaction, kind), + Some(source) => self.mark_invalid_from_source(transaction, kind, source), None => { self.left.mark_invalid(transaction, kind); self.right.mark_invalid(transaction, kind); @@ -254,6 +328,51 @@ where } } +impl SourcedBestTransactions for StateAwareBestTransactions +where + I: SourcedBestTransactions>> + Send, +{ + type Source = I::Source; + + fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> { + loop { + let (tx, source) = self.inner.next_with_source()?; + + let Some(key) = tx.transaction.fee_balance_slot() else { + debug_assert!(false, "pool transaction must have cached fee_balance_slot"); + continue; + }; + + if let Some(&balance) = self.decreased_balances.get(&key) + && balance < tx.transaction.fee_token_cost() + { + self.inner.mark_invalid_with_source( + &tx, + &InvalidPoolTransactionError::Consensus( + InvalidTransactionError::InsufficientFunds( + (balance, tx.transaction.fee_token_cost()).into(), + ), + ), + source, + ); + continue; + } + + return Some((tx, source)); + } + } + + fn mark_invalid_with_source( + &mut self, + transaction: &Self::Item, + kind: &InvalidPoolTransactionError, + source: Self::Source, + ) { + self.inner + .mark_invalid_with_source(transaction, kind, source); + } +} + #[cfg(test)] mod tests { use super::*; @@ -484,4 +603,37 @@ mod tests { assert_eq!(merged.next(), Some("L2")); assert_eq!(merged.next(), None); } + + #[test] + fn test_mark_invalid_with_source_handles_buffered_transactions() { + let left = MockBestTransactions::new(vec![("L1", 9), ("L2", 3)]); + let right = MockBestTransactions::new(vec![("R1", 10)]); + + let left_invalidated = left.invalidated(); + let right_invalidated = right.invalidated(); + + let mut merged = MergeBestTransactions::new(left, right); + + let (first_tx, first_source) = merged.next_with_source().unwrap(); + assert_eq!(first_tx, "R1"); + assert_eq!(first_source, BestTransactionSource::Right); + + let (second_tx, second_source) = merged.next_with_source().unwrap(); + assert_eq!(second_tx, "L1"); + assert_eq!(second_source, BestTransactionSource::Left); + + let kind = + InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported); + merged.mark_invalid_with_source(&first_tx, &kind, first_source); + + assert!( + left_invalidated.lock().unwrap().is_empty(), + "left pool must NOT be invalidated for a buffered right-side tx" + ); + assert_eq!( + *right_invalidated.lock().unwrap(), + vec!["R1"], + "right pool must receive invalidation even after later prefetches" + ); + } } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 3d3e494cc1..7d737436ec 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -21,7 +21,7 @@ pub mod metrics; pub mod paused; pub mod tt_2d_pool; -pub use best::StateAwareBestTransactions; +pub use best::{BestTransactionSource, SourcedBestTransactions, StateAwareBestTransactions}; pub use maintain::TempoPoolUpdates; pub use metrics::{AA2dPoolMetrics, TempoPoolMaintenanceMetrics}; diff --git a/crates/transaction-pool/src/tempo_pool.rs b/crates/transaction-pool/src/tempo_pool.rs index 08fc60bbd6..7101ab14b3 100644 --- a/crates/transaction-pool/src/tempo_pool.rs +++ b/crates/transaction-pool/src/tempo_pool.rs @@ -3,8 +3,11 @@ // Routes user nonces (nonce_key>0) to minimal 2D nonce pool use crate::{ - amm::AmmLiquidityCache, best::MergeBestTransactions, transaction::TempoPooledTransaction, - tt_2d_pool::AA2dPool, validator::TempoTransactionValidator, + amm::AmmLiquidityCache, + best::{BestTransactionSource, MergeBestTransactions, SourcedBestTransactions}, + transaction::TempoPooledTransaction, + tt_2d_pool::AA2dPool, + validator::TempoTransactionValidator, }; use alloy_consensus::Transaction; use alloy_primitives::{ @@ -88,6 +91,19 @@ where self.protocol_pool.validator().validator().client() } + /// Returns the best transactions with their source pool. + pub fn best_transactions_with_source( + &self, + _attributes: BestTransactionsAttributes, + ) -> impl SourcedBestTransactions< + Item = Arc>, + Source = BestTransactionSource, + > { + let left = self.protocol_pool.inner().best_transactions(); + let right = self.aa_2d_pool.read().best_transactions(); + MergeBestTransactions::new(left, right) + } + /// Updates the 2d nonce pool with the given state changes. pub(crate) fn notify_aa_pool_on_state_updates(&self, state: &AddressMap) { let (promoted, _mined) = self.aa_2d_pool.write().on_state_updates(state);