Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions crates/payload/builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use reth_primitives_traits::{Recovered, transaction::error::InvalidTransactionEr
use reth_revm::{State, context::Block, database::StateProviderDatabase};
use reth_storage_api::{StateProvider, StateProviderFactory};
use reth_transaction_pool::{
BestTransactions, BestTransactionsAttributes, TransactionPool, ValidPoolTransaction,
error::InvalidPoolTransactionError,
BestTransactionsAttributes, ValidPoolTransaction, error::InvalidPoolTransactionError,
};
use std::{
sync::{
Expand All @@ -53,7 +52,7 @@ use tempo_primitives::{
},
};
use tempo_transaction_pool::{
StateAwareBestTransactions, TempoTransactionPool,
SourcedBestTransactions, StateAwareBestTransactions, TempoTransactionPool,
transaction::{TempoPoolTransactionError, TempoPooledTransaction},
};
use tracing::{Level, debug, debug_span, error, info, instrument, trace, warn};
Expand Down Expand Up @@ -177,7 +176,7 @@ where
) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError> {
self.build_payload(
args,
|attributes| self.pool.best_transactions_with_attributes(attributes),
|attributes| self.pool.best_transactions_with_source(attributes),
false,
)
}
Expand Down Expand Up @@ -230,7 +229,7 @@ where
empty: bool,
) -> Result<BuildOutcome<TempoBuiltPayload>, PayloadBuilderError>
where
Txs: BestTransactions<Item = Arc<ValidPoolTransaction<TempoPooledTransaction>>>,
Txs: SourcedBestTransactions<Item = Arc<ValidPoolTransaction<TempoPooledTransaction>>>,
{
let BuildArguments {
mut cached_reads,
Expand Down Expand Up @@ -445,7 +444,7 @@ where

check_cancel!();

let Some(pool_tx) = best_txs.next() else {
let Some((pool_tx, pool_tx_source)) = best_txs.next_with_source() else {
if build_until_interrupt && cumulative_gas_used < non_shared_gas_limit {
std::thread::sleep(Duration::from_millis(1));
continue;
Expand All @@ -464,12 +463,13 @@ where
if cumulative_gas_used + max_regular_gas_used > non_shared_gas_limit {
// Mark this transaction as invalid since it doesn't fit
// The iterator will handle lane switching internally when appropriate
best_txs.mark_invalid(
best_txs.mark_invalid_with_source(
&pool_tx,
&InvalidPoolTransactionError::ExceedsGasLimit(
pool_tx.gas_limit(),
non_shared_gas_limit - cumulative_gas_used,
),
pool_tx_source,
);
self.metrics
.inc_pool_tx_skipped("exceeds_non_shared_gas_limit");
Expand All @@ -481,11 +481,12 @@ where
if !pool_tx.transaction.is_payment()
&& non_payment_gas_used + max_regular_gas_used > general_gas_limit
{
best_txs.mark_invalid(
best_txs.mark_invalid_with_source(
&pool_tx,
&InvalidPoolTransactionError::Other(Box::new(
TempoPoolTransactionError::ExceedsNonPaymentLimit,
)),
pool_tx_source,
);
self.metrics
.inc_pool_tx_skipped("exceeds_general_gas_limit");
Expand All @@ -507,12 +508,13 @@ where
let estimated_block_size_with_tx = block_size_used + tx_rlp_length;

if is_osaka && estimated_block_size_with_tx > MAX_RLP_BLOCK_SIZE {
best_txs.mark_invalid(
best_txs.mark_invalid_with_source(
&pool_tx,
&InvalidPoolTransactionError::OversizedData {
size: estimated_block_size_with_tx,
limit: MAX_RLP_BLOCK_SIZE,
},
pool_tx_source,
);
self.metrics.inc_pool_tx_skipped("oversized_block");
continue;
Expand Down Expand Up @@ -573,11 +575,12 @@ where
// if the transaction is invalid, we can skip it and all of its
// descendants
trace!(%error, tx = %tx_debug_repr, "skipping invalid transaction and its descendants");
best_txs.mark_invalid(
best_txs.mark_invalid_with_source(
&pool_tx,
&InvalidPoolTransactionError::Consensus(
InvalidTransactionError::TxTypeNotSupported,
),
pool_tx_source,
);
self.metrics.inc_pool_tx_skipped("invalid_tx");
}
Expand Down
184 changes: 168 additions & 16 deletions crates/transaction-pool/src/best.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,52 @@ impl BestPriorityTransactions<CoinbaseTipOrdering<TempoPooledTransaction>>
}
}

/// Tracks which side of a [`MergeBestTransactions`] yielded the last transaction.
#[derive(Debug, Clone, Copy)]
enum MergeSource {
/// Tracks which side of a [`MergeBestTransactions`] yielded a transaction.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BestTransactionSource {
/// The transaction came from the left iterator.
Left,
/// The transaction came from the right iterator.
Right,
}

/// A [`BestTransactions`] extension for callers that need to retain the source
/// of a transaction across buffered consumption.
pub trait SourcedBestTransactions: BestTransactions {
/// Source identifier returned with each transaction.
type Source: Copy;

/// Returns the next best transaction and the source iterator that yielded it.
fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)>;

/// Marks a transaction as invalid in the provided source iterator.
fn mark_invalid_with_source(
&mut self,
transaction: &Self::Item,
kind: &InvalidPoolTransactionError,
source: Self::Source,
);
}

impl<T> SourcedBestTransactions for core::iter::Empty<T>
where
core::iter::Empty<T>: BestTransactions<Item = T>,
{
type Source = ();

fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> {
None
}

fn mark_invalid_with_source(
&mut self,
_transaction: &Self::Item,
_kind: &InvalidPoolTransactionError,
_source: Self::Source,
) {
}
}

/// A [`BestTransactions`] iterator that merges two individual implementations and always yields the next best item from either of the iterators.
pub struct MergeBestTransactions<L, R, T>
where
Expand All @@ -51,7 +90,7 @@ where
right: R,
next_left: Option<(L::Item, Priority<T::PriorityValue>)>,
next_right: Option<(L::Item, Priority<T::PriorityValue>)>,
last_source: Option<MergeSource>,
last_source: Option<BestTransactionSource>,
}

impl<L, R, T> MergeBestTransactions<L, R, T>
Expand Down Expand Up @@ -79,7 +118,9 @@ where
T: TransactionOrdering,
{
/// Returns the next transaction from either the left or the right iterator with the higher priority.
fn next_best(&mut self) -> Option<(L::Item, Priority<T::PriorityValue>)> {
fn next_best(
&mut self,
) -> Option<(L::Item, Priority<T::PriorityValue>, BestTransactionSource)> {
if self.next_left.is_none() {
self.next_left = self.left.next_tx_and_priority();
}
Expand All @@ -94,31 +135,43 @@ where
}
// Only left has an item - take it
(Some(_), None) => {
self.last_source = Some(MergeSource::Left);
self.last_source = Some(BestTransactionSource::Left);
let (item, priority) = self.next_left.take()?;
Some((item, priority))
Some((item, priority, BestTransactionSource::Left))
}
// Only right has an item - take it
(None, Some(_)) => {
self.last_source = Some(MergeSource::Right);
self.last_source = Some(BestTransactionSource::Right);
let (item, priority) = self.next_right.take()?;
Some((item, priority))
Some((item, priority, BestTransactionSource::Right))
}
// Both sides have items - compare priorities and take the higher one
(Some((_, left_priority)), Some((_, right_priority))) => {
// Higher priority value is better
if left_priority >= right_priority {
self.last_source = Some(MergeSource::Left);
self.last_source = Some(BestTransactionSource::Left);
let (item, priority) = self.next_left.take()?;
Some((item, priority))
Some((item, priority, BestTransactionSource::Left))
} else {
self.last_source = Some(MergeSource::Right);
self.last_source = Some(BestTransactionSource::Right);
let (item, priority) = self.next_right.take()?;
Some((item, priority))
Some((item, priority, BestTransactionSource::Right))
}
}
}
}

fn mark_invalid_from_source(
&mut self,
transaction: &L::Item,
kind: &InvalidPoolTransactionError,
source: BestTransactionSource,
) {
match source {
BestTransactionSource::Left => self.left.mark_invalid(transaction, kind),
BestTransactionSource::Right => self.right.mark_invalid(transaction, kind),
}
}
}

impl<L, R, T> Iterator for MergeBestTransactions<L, R, T>
Expand All @@ -130,7 +183,29 @@ where
type Item = L::Item;

fn next(&mut self) -> Option<Self::Item> {
self.next_best().map(|(tx, _)| tx)
self.next_best().map(|(tx, _, _)| tx)
}
}

impl<L, R, T> SourcedBestTransactions for MergeBestTransactions<L, R, T>
where
L: BestPriorityTransactions<T, Item: Send> + Send,
R: BestPriorityTransactions<T, Item = L::Item> + Send,
T: TransactionOrdering,
{
type Source = BestTransactionSource;

fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> {
self.next_best().map(|(tx, _, source)| (tx, source))
}

fn mark_invalid_with_source(
&mut self,
transaction: &Self::Item,
kind: &InvalidPoolTransactionError,
source: Self::Source,
) {
self.mark_invalid_from_source(transaction, kind, source);
}
}

Expand All @@ -142,8 +217,7 @@ where
{
fn mark_invalid(&mut self, transaction: &Self::Item, kind: &InvalidPoolTransactionError) {
match self.last_source {
Some(MergeSource::Left) => self.left.mark_invalid(transaction, kind),
Some(MergeSource::Right) => self.right.mark_invalid(transaction, kind),
Some(source) => self.mark_invalid_from_source(transaction, kind, source),
None => {
self.left.mark_invalid(transaction, kind);
self.right.mark_invalid(transaction, kind);
Expand Down Expand Up @@ -254,6 +328,51 @@ where
}
}

impl<I> SourcedBestTransactions for StateAwareBestTransactions<I>
where
I: SourcedBestTransactions<Item = Arc<ValidPoolTransaction<TempoPooledTransaction>>> + Send,
{
type Source = I::Source;

fn next_with_source(&mut self) -> Option<(Self::Item, Self::Source)> {
loop {
let (tx, source) = self.inner.next_with_source()?;

let Some(key) = tx.transaction.fee_balance_slot() else {
debug_assert!(false, "pool transaction must have cached fee_balance_slot");
continue;
};

if let Some(&balance) = self.decreased_balances.get(&key)
&& balance < tx.transaction.fee_token_cost()
{
self.inner.mark_invalid_with_source(
&tx,
&InvalidPoolTransactionError::Consensus(
InvalidTransactionError::InsufficientFunds(
(balance, tx.transaction.fee_token_cost()).into(),
),
),
source,
);
continue;
}

return Some((tx, source));
}
}

fn mark_invalid_with_source(
&mut self,
transaction: &Self::Item,
kind: &InvalidPoolTransactionError,
source: Self::Source,
) {
self.inner
.mark_invalid_with_source(transaction, kind, source);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -484,4 +603,37 @@ mod tests {
assert_eq!(merged.next(), Some("L2"));
assert_eq!(merged.next(), None);
}

#[test]
fn test_mark_invalid_with_source_handles_buffered_transactions() {
let left = MockBestTransactions::new(vec![("L1", 9), ("L2", 3)]);
let right = MockBestTransactions::new(vec![("R1", 10)]);

let left_invalidated = left.invalidated();
let right_invalidated = right.invalidated();

let mut merged = MergeBestTransactions::new(left, right);

let (first_tx, first_source) = merged.next_with_source().unwrap();
assert_eq!(first_tx, "R1");
assert_eq!(first_source, BestTransactionSource::Right);

let (second_tx, second_source) = merged.next_with_source().unwrap();
assert_eq!(second_tx, "L1");
assert_eq!(second_source, BestTransactionSource::Left);

let kind =
InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported);
merged.mark_invalid_with_source(&first_tx, &kind, first_source);

assert!(
left_invalidated.lock().unwrap().is_empty(),
"left pool must NOT be invalidated for a buffered right-side tx"
);
assert_eq!(
*right_invalidated.lock().unwrap(),
vec!["R1"],
"right pool must receive invalidation even after later prefetches"
);
}
}
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod metrics;
pub mod paused;
pub mod tt_2d_pool;

pub use best::StateAwareBestTransactions;
pub use best::{BestTransactionSource, SourcedBestTransactions, StateAwareBestTransactions};
pub use maintain::TempoPoolUpdates;

pub use metrics::{AA2dPoolMetrics, TempoPoolMaintenanceMetrics};
Expand Down
Loading
Loading