|
1 | 1 | //! Transaction service responsible for fetching and sending trasnsactions to the simulator. |
2 | 2 | use crate::config::BuilderConfig; |
3 | | -use alloy::consensus::TxEnvelope; |
| 3 | +use alloy::{ |
| 4 | + consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, |
| 5 | + providers::Provider, |
| 6 | +}; |
4 | 7 | use eyre::Error; |
5 | 8 | use reqwest::{Client, Url}; |
6 | 9 | use serde::{Deserialize, Serialize}; |
7 | 10 | use std::time::Duration; |
8 | 11 | use tokio::{sync::mpsc, task::JoinHandle, time}; |
9 | | -use tracing::{Instrument, debug, debug_span, trace}; |
| 12 | +use tracing::{Instrument, debug, debug_span, info_span, trace}; |
10 | 13 |
|
11 | 14 | /// Poll interval for the transaction poller in milliseconds. |
12 | 15 | const POLL_INTERVAL_MS: u64 = 1000; |
@@ -56,6 +59,45 @@ impl TxPoller { |
56 | 59 | Duration::from_millis(self.poll_interval_ms) |
57 | 60 | } |
58 | 61 |
|
| 62 | + // Spawn a tokio task to check the nonce of a transaction before sending |
| 63 | + // it to the cachetask via the outbound channel. |
| 64 | + fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<TxEnvelope>) { |
| 65 | + tokio::spawn(async move { |
| 66 | + let span = info_span!("check_nonce", tx_id = %tx.tx_hash()); |
| 67 | + |
| 68 | + let Ok(ru_provider) = |
| 69 | + crate::config().connect_ru_provider().instrument(span.clone()).await |
| 70 | + else { |
| 71 | + span_warn!(span, "Failed to connect to RU provider, stopping noncecheck task."); |
| 72 | + return; |
| 73 | + }; |
| 74 | + |
| 75 | + let Ok(sender) = tx.recover_signer() else { |
| 76 | + span_warn!(span, "Failed to recover sender from transaction"); |
| 77 | + return; |
| 78 | + }; |
| 79 | + |
| 80 | + let Ok(tx_count) = ru_provider |
| 81 | + .get_transaction_count(sender) |
| 82 | + .into_future() |
| 83 | + .instrument(span.clone()) |
| 84 | + .await |
| 85 | + else { |
| 86 | + span_warn!(span, %sender, "Failed to fetch nonce for sender"); |
| 87 | + return; |
| 88 | + }; |
| 89 | + |
| 90 | + if tx.nonce() < tx_count { |
| 91 | + span_debug!(span, %sender, tx_nonce = %tx.nonce(), ru_nonce = %tx_count, "Dropping transaction with stale nonce"); |
| 92 | + return; |
| 93 | + } |
| 94 | + |
| 95 | + if outbound.send(tx).is_err() { |
| 96 | + span_warn!(span, "Outbound channel closed, stopping NonceChecker task."); |
| 97 | + } |
| 98 | + }); |
| 99 | + } |
| 100 | + |
59 | 101 | /// Polls the transaction cache for transactions. |
60 | 102 | pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> { |
61 | 103 | let url: Url = self.config.tx_pool_url.join("transactions")?; |
@@ -94,11 +136,7 @@ impl TxPoller { |
94 | 136 | let _guard = span.entered(); |
95 | 137 | debug!(count = ?transactions.len(), "found transactions"); |
96 | 138 | for tx in transactions.into_iter() { |
97 | | - if outbound.send(tx).is_err() { |
98 | | - // If there are no receivers, we can shut down |
99 | | - trace!("No receivers left, shutting down"); |
100 | | - break; |
101 | | - } |
| 139 | + self.spawn_check_nonce(tx, outbound.clone()); |
102 | 140 | } |
103 | 141 | } |
104 | 142 |
|
|
0 commit comments