Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 126 additions & 86 deletions crates/op-rbuilder/src/builder/best_txs.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,81 @@
use alloy_primitives::{Address, TxHash};
use reth_payload_util::PayloadTransactions;
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction};
use std::{collections::HashSet, sync::Arc};
use tracing::debug;

use crate::tx::MaybeFlashblockFilter;

pub(super) struct BestFlashblocksTxs<T, I>
#[derive(Debug, Default)]
pub(super) struct FlashblockCommittedTxs {
// Transactions that were already committed to the state. Using them again would cause
// NonceTooLow, so we skip them in later flashblocks.
committed_transactions: HashSet<TxHash>,
}

impl FlashblockCommittedTxs {
pub(super) fn contains(&self, tx_hash: &TxHash) -> bool {
self.committed_transactions.contains(tx_hash)
}

pub(super) fn mark_committed(&mut self, txs: Vec<TxHash>) {
self.committed_transactions.extend(txs);
}
}

pub(super) struct FlashblockPoolTxCursor<'a, T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
inner: reth_payload_util::BestPayloadTransactions<T, I>,
inner: Option<BestPayloadTransactions<T, I>>,
current_flashblock_number: u64,
// Transactions that were already commited to the state. Using them again would cause NonceTooLow
// so we skip them
commited_transactions: HashSet<TxHash>,
committed_txs: &'a mut FlashblockCommittedTxs,
}

impl<T, I> BestFlashblocksTxs<T, I>
impl<'a, T, I> FlashblockPoolTxCursor<'a, T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
pub(super) fn new(inner: reth_payload_util::BestPayloadTransactions<T, I>) -> Self {
pub(super) fn new(committed_txs: &'a mut FlashblockCommittedTxs) -> Self {
Self {
inner,
inner: None,
current_flashblock_number: 0,
commited_transactions: Default::default(),
committed_txs,
}
}

/// Replaces current iterator with new one. We use it on new flashblock building, to refresh
/// priority boundaries
pub(super) fn refresh_iterator(
&mut self,
inner: reth_payload_util::BestPayloadTransactions<T, I>,
inner: BestPayloadTransactions<T, I>,
current_flashblock_number: u64,
) {
self.inner = inner;
self.inner = Some(inner);
self.current_flashblock_number = current_flashblock_number;
}

/// Remove transaction from next iteration and it already in the state
pub(super) fn mark_commited(&mut self, txs: Vec<TxHash>) {
self.commited_transactions.extend(txs);
pub(super) fn mark_committed(&mut self, txs: Vec<TxHash>) {
self.committed_txs.mark_committed(txs);
}
}

impl<T, I> PayloadTransactions for BestFlashblocksTxs<T, I>
impl<T, I> PayloadTransactions for FlashblockPoolTxCursor<'_, T, I>
where
T: PoolTransaction + MaybeFlashblockFilter,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
type Transaction = T;

fn next(&mut self, ctx: ()) -> Option<Self::Transaction> {
let inner = self.inner.as_mut()?;
loop {
let tx = self.inner.next(ctx)?;
let tx = inner.next(ctx)?;
// Skip transaction we already included
if self.commited_transactions.contains(tx.hash()) {
if self.committed_txs.contains(tx.hash()) {
continue;
}

Expand All @@ -86,7 +102,7 @@ where
max_flashblock = max,
"Bundle flashblock max exceeded"
);
self.inner.mark_invalid(tx.sender(), tx.nonce());
inner.mark_invalid(tx.sender(), tx.nonce());
continue;
}

Expand All @@ -96,14 +112,16 @@ where

/// Proxy to inner iterator
fn mark_invalid(&mut self, sender: Address, nonce: u64) {
self.inner.mark_invalid(sender, nonce);
if let Some(inner) = self.inner.as_mut() {
inner.mark_invalid(sender, nonce);
}
}
}

#[cfg(test)]
mod tests {
use crate::{
builder::best_txs::BestFlashblocksTxs,
builder::best_txs::{FlashblockCommittedTxs, FlashblockPoolTxCursor},
mock_tx::{MockFbTransaction, MockFbTransactionFactory},
};
use alloy_consensus::Transaction;
Expand All @@ -124,35 +142,43 @@ mod tests {
pool.add_transaction(Arc::new(tx_2), 0);
pool.add_transaction(Arc::new(tx_3), 0);

// Create iterator
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
let mut committed = FlashblockCommittedTxs::default();
// ### First flashblock
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
// Accept first tx
let tx1 = iterator.next(()).unwrap();
// Invalidate second tx
let tx2 = iterator.next(()).unwrap();
iterator.mark_invalid(tx2.sender(), tx2.nonce());
// Accept third tx
let tx3 = iterator.next(()).unwrap();
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
// Mark transaction as commited
iterator.mark_commited(vec![*tx1.hash(), *tx3.hash()]);
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
// Accept first tx
let tx1 = cursor.next(()).unwrap();
// Invalidate second tx
let tx2 = cursor.next(()).unwrap();
cursor.mark_invalid(tx2.sender(), tx2.nonce());
// Accept third tx
let tx3 = cursor.next(()).unwrap();
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
// Mark transaction as committed
cursor.mark_committed(vec![*tx1.hash(), *tx3.hash()]);
}

// ### Second flashblock
// It should not return txs 1 and 3, but should return 2
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx2 = iterator.next(()).unwrap();
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
// Mark transaction as commited
iterator.mark_commited(vec![*tx2.hash()]);
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx2 = cursor.next(()).unwrap();
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
// Mark transaction as committed
cursor.mark_committed(vec![*tx2.hash()]);
}

// ### Third flashblock
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}
}

/// Test bundle cases
Expand All @@ -176,62 +202,76 @@ mod tests {
pool.add_transaction(Arc::new(tx_3), 0);
pool.add_transaction(Arc::new(tx_4), 0);

// Create iterator
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
let mut committed = FlashblockCommittedTxs::default();
// ### First flashblock
// should contain txs 1 and 2
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = iterator.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
let tx1 = cursor.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = cursor.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}

// ### Second flashblock
// should contain txs 1, 2, and 3
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = iterator.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx1 = cursor.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = cursor.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
let tx3 = cursor.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}

// ### Third flashblock
// should contain txs 1, 3, and 4
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = iterator.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
let tx1 = cursor.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = cursor.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = cursor.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}

// ### Forth flashblock
// should contain txs 1, 3, and 4
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = iterator.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3);
let tx1 = cursor.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = cursor.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = cursor.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}

// ### Fifth flashblock
// should contain txs 1 and 3
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
{
let mut cursor = FlashblockPoolTxCursor::new(&mut committed);
cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4);
let tx1 = cursor.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = cursor.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(cursor.next(()).is_none(), "Iterator should be empty");
}
}
}
Loading
Loading