From 8b261930c61289417fc6a2cffd2e365e034d6d69 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 9 Mar 2026 23:40:25 +0800 Subject: [PATCH 1/7] feat: async payload builder --- crates/op-rbuilder/src/builder/best_txs.rs | 212 ++++++---- crates/op-rbuilder/src/builder/generator.rs | 366 +++++++++-------- crates/op-rbuilder/src/builder/payload.rs | 420 ++++++++++++++------ crates/op-rbuilder/src/builder/service.rs | 1 + crates/op-rbuilder/src/builder/timing.rs | 7 +- 5 files changed, 625 insertions(+), 381 deletions(-) diff --git a/crates/op-rbuilder/src/builder/best_txs.rs b/crates/op-rbuilder/src/builder/best_txs.rs index c6909f31..a226e312 100644 --- a/crates/op-rbuilder/src/builder/best_txs.rs +++ b/crates/op-rbuilder/src/builder/best_txs.rs @@ -1,33 +1,48 @@ use alloy_primitives::{Address, TxHash}; -use reth_payload_util::PayloadTransactions; +use reth_payload_util::{BestPayloadTransactions, PayloadTransactions}; use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction}; use std::{collections::HashSet, sync::Arc}; use tracing::debug; use crate::tx::MaybeFlashblockFilter; -pub(super) struct BestFlashblocksTxs +#[derive(Debug, Default)] +pub(super) struct FlashblockCommittedTxs { + // Transactions that were already committed to the state. Using them again would cause + // NonceTooLow, so we skip them in later flashblocks. + committed_transactions: HashSet, +} + +impl FlashblockCommittedTxs { + pub(super) fn contains(&self, tx_hash: &TxHash) -> bool { + self.committed_transactions.contains(tx_hash) + } + + pub(super) fn mark_committed(&mut self, txs: Vec) { + self.committed_transactions.extend(txs); + } +} + +pub(super) struct FlashblockPoolTxCursor<'a, T, I> where T: PoolTransaction, I: Iterator>>, { - inner: reth_payload_util::BestPayloadTransactions, + inner: Option>, current_flashblock_number: u64, - // Transactions that were already commited to the state. Using them again would cause NonceTooLow - // so we skip them - commited_transactions: HashSet, + committed_txs: &'a mut FlashblockCommittedTxs, } -impl BestFlashblocksTxs +impl<'a, T, I> FlashblockPoolTxCursor<'a, T, I> where T: PoolTransaction, I: Iterator>>, { - pub(super) fn new(inner: reth_payload_util::BestPayloadTransactions) -> Self { + pub(super) fn new(committed_txs: &'a mut FlashblockCommittedTxs) -> Self { Self { - inner, + inner: None, current_flashblock_number: 0, - commited_transactions: Default::default(), + committed_txs, } } @@ -35,20 +50,20 @@ where /// priority boundaries pub(super) fn refresh_iterator( &mut self, - inner: reth_payload_util::BestPayloadTransactions, + inner: BestPayloadTransactions, current_flashblock_number: u64, ) { - self.inner = inner; + self.inner = Some(inner); self.current_flashblock_number = current_flashblock_number; } /// Remove transaction from next iteration and it already in the state - pub(super) fn mark_commited(&mut self, txs: Vec) { - self.commited_transactions.extend(txs); + pub(super) fn mark_committed(&mut self, txs: Vec) { + self.committed_txs.mark_committed(txs); } } -impl PayloadTransactions for BestFlashblocksTxs +impl PayloadTransactions for FlashblockPoolTxCursor<'_, T, I> where T: PoolTransaction + MaybeFlashblockFilter, I: Iterator>>, @@ -56,10 +71,11 @@ where type Transaction = T; fn next(&mut self, ctx: ()) -> Option { + let inner = self.inner.as_mut()?; loop { - let tx = self.inner.next(ctx)?; + let tx = inner.next(ctx)?; // Skip transaction we already included - if self.commited_transactions.contains(tx.hash()) { + if self.committed_txs.contains(tx.hash()) { continue; } @@ -86,7 +102,7 @@ where max_flashblock = max, "Bundle flashblock max exceeded" ); - self.inner.mark_invalid(tx.sender(), tx.nonce()); + inner.mark_invalid(tx.sender(), tx.nonce()); continue; } @@ -96,14 +112,16 @@ where /// Proxy to inner iterator fn mark_invalid(&mut self, sender: Address, nonce: u64) { - self.inner.mark_invalid(sender, nonce); + if let Some(inner) = self.inner.as_mut() { + inner.mark_invalid(sender, nonce); + } } } #[cfg(test)] mod tests { use crate::{ - builder::best_txs::BestFlashblocksTxs, + builder::best_txs::{FlashblockCommittedTxs, FlashblockPoolTxCursor}, mock_tx::{MockFbTransaction, MockFbTransactionFactory}, }; use alloy_consensus::Transaction; @@ -124,35 +142,43 @@ mod tests { pool.add_transaction(Arc::new(tx_2), 0); pool.add_transaction(Arc::new(tx_3), 0); - // Create iterator - let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best())); + let mut committed = FlashblockCommittedTxs::default(); // ### First flashblock - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0); - // Accept first tx - let tx1 = iterator.next(()).unwrap(); - // Invalidate second tx - let tx2 = iterator.next(()).unwrap(); - iterator.mark_invalid(tx2.sender(), tx2.nonce()); - // Accept third tx - let tx3 = iterator.next(()).unwrap(); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); - // Mark transaction as commited - iterator.mark_commited(vec![*tx1.hash(), *tx3.hash()]); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0); + // Accept first tx + let tx1 = cursor.next(()).unwrap(); + // Invalidate second tx + let tx2 = cursor.next(()).unwrap(); + cursor.mark_invalid(tx2.sender(), tx2.nonce()); + // Accept third tx + let tx3 = cursor.next(()).unwrap(); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + // Mark transaction as committed + cursor.mark_committed(vec![*tx1.hash(), *tx3.hash()]); + } // ### Second flashblock // It should not return txs 1 and 3, but should return 2 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1); - let tx2 = iterator.next(()).unwrap(); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); - // Mark transaction as commited - iterator.mark_commited(vec![*tx2.hash()]); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1); + let tx2 = cursor.next(()).unwrap(); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + // Mark transaction as committed + cursor.mark_committed(vec![*tx2.hash()]); + } // ### Third flashblock - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } } /// Test bundle cases @@ -176,62 +202,76 @@ mod tests { pool.add_transaction(Arc::new(tx_3), 0); pool.add_transaction(Arc::new(tx_4), 0); - // Create iterator - let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best())); + let mut committed = FlashblockCommittedTxs::default(); // ### First flashblock // should contain txs 1 and 2 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0); - let tx1 = iterator.next(()).unwrap(); - assert_eq!(tx1.hash(), &tx_1_hash); - let tx2 = iterator.next(()).unwrap(); - assert_eq!(tx2.hash(), &tx_2_hash); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0); + let tx1 = cursor.next(()).unwrap(); + assert_eq!(tx1.hash(), &tx_1_hash); + let tx2 = cursor.next(()).unwrap(); + assert_eq!(tx2.hash(), &tx_2_hash); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } // ### Second flashblock // should contain txs 1, 2, and 3 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1); - let tx1 = iterator.next(()).unwrap(); - assert_eq!(tx1.hash(), &tx_1_hash); - let tx2 = iterator.next(()).unwrap(); - assert_eq!(tx2.hash(), &tx_2_hash); - let tx3 = iterator.next(()).unwrap(); - assert_eq!(tx3.hash(), &tx_3_hash); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1); + let tx1 = cursor.next(()).unwrap(); + assert_eq!(tx1.hash(), &tx_1_hash); + let tx2 = cursor.next(()).unwrap(); + assert_eq!(tx2.hash(), &tx_2_hash); + let tx3 = cursor.next(()).unwrap(); + assert_eq!(tx3.hash(), &tx_3_hash); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } // ### Third flashblock // should contain txs 1, 3, and 4 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2); - let tx1 = iterator.next(()).unwrap(); - assert_eq!(tx1.hash(), &tx_1_hash); - let tx3 = iterator.next(()).unwrap(); - assert_eq!(tx3.hash(), &tx_3_hash); - let tx4 = iterator.next(()).unwrap(); - assert_eq!(tx4.hash(), &tx_4_hash); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2); + let tx1 = cursor.next(()).unwrap(); + assert_eq!(tx1.hash(), &tx_1_hash); + let tx3 = cursor.next(()).unwrap(); + assert_eq!(tx3.hash(), &tx_3_hash); + let tx4 = cursor.next(()).unwrap(); + assert_eq!(tx4.hash(), &tx_4_hash); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } // ### Forth flashblock // should contain txs 1, 3, and 4 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3); - let tx1 = iterator.next(()).unwrap(); - assert_eq!(tx1.hash(), &tx_1_hash); - let tx3 = iterator.next(()).unwrap(); - assert_eq!(tx3.hash(), &tx_3_hash); - let tx4 = iterator.next(()).unwrap(); - assert_eq!(tx4.hash(), &tx_4_hash); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3); + let tx1 = cursor.next(()).unwrap(); + assert_eq!(tx1.hash(), &tx_1_hash); + let tx3 = cursor.next(()).unwrap(); + assert_eq!(tx3.hash(), &tx_3_hash); + let tx4 = cursor.next(()).unwrap(); + assert_eq!(tx4.hash(), &tx_4_hash); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } // ### Fifth flashblock // should contain txs 1 and 3 - iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4); - let tx1 = iterator.next(()).unwrap(); - assert_eq!(tx1.hash(), &tx_1_hash); - let tx3 = iterator.next(()).unwrap(); - assert_eq!(tx3.hash(), &tx_3_hash); - // Check that it's empty - assert!(iterator.next(()).is_none(), "Iterator should be empty"); + { + let mut cursor = FlashblockPoolTxCursor::new(&mut committed); + cursor.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4); + let tx1 = cursor.next(()).unwrap(); + assert_eq!(tx1.hash(), &tx_1_hash); + let tx3 = cursor.next(()).unwrap(); + assert_eq!(tx3.hash(), &tx_3_hash); + // Check that it's empty + assert!(cursor.next(()).is_none(), "Iterator should be empty"); + } } } diff --git a/crates/op-rbuilder/src/builder/generator.rs b/crates/op-rbuilder/src/builder/generator.rs index 44e58e49..917e9f26 100644 --- a/crates/op-rbuilder/src/builder/generator.rs +++ b/crates/op-rbuilder/src/builder/generator.rs @@ -20,7 +20,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; use tokio::{ - sync::{Notify, oneshot}, + sync::watch, time::{Duration, Sleep}, }; use tokio_util::sync::CancellationToken; @@ -53,10 +53,10 @@ pub(super) trait PayloadBuilder: Send + Sync + Clone { /// # Returns /// /// A `Result` indicating the build outcome or an error. - fn try_build( + async fn try_build( &self, args: BuildArguments, - best_payload: BlockCell, + best_payload_tx: watch::Sender>, ) -> Result<(), PayloadBuilderError>; } @@ -195,10 +195,9 @@ where executor: self.executor.clone(), builder: self.builder.clone(), config, - cell: BlockCell::new(), + payload_rx: None, cancel: cancel_token, deadline, - build_complete: None, cached_reads: self.maybe_pre_cached(parent_header.hash()), }; @@ -251,12 +250,11 @@ where /// /// See [PayloadBuilder] pub(crate) builder: Builder, - /// The cell that holds the built payload. - pub(crate) cell: BlockCell, + /// Receiver for the latest payload from the builder task. + pub(crate) payload_rx: Option>>, /// Cancellation token for the running job pub(crate) cancel: CancellationToken, pub(crate) deadline: Pin>, // Add deadline - pub(crate) build_complete: Option>>, /// Caches all disk reads for the state the new payloads builds on /// /// This is used to avoid reading the same state over and over again when new attempts are @@ -269,7 +267,7 @@ where Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, - Builder::BuiltPayload: Unpin + Clone, + Builder::BuiltPayload: Unpin + Clone + Send + Sync + 'static, { type PayloadAttributes = Builder::Attributes; type ResolvePayloadFuture = ResolvePayload; @@ -287,13 +285,11 @@ where &mut self, kind: PayloadKind, ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { - tracing::info!("Resolve kind {:?}", kind); - - // check if self.cell has a payload - self.cancel.cancel(); + info!("Resolve kind {:?}", kind); - let resolve_future = ResolvePayload::new(self.cell.wait_for_value()); - (resolve_future, KeepPayloadJobAlive::No) + let rx = self.payload_rx.take(); + let cancel = self.cancel.clone(); + (ResolvePayload::new(rx, cancel), KeepPayloadJobAlive::No) } } @@ -317,21 +313,23 @@ where pub(super) fn spawn_build_job(&mut self) { let builder = self.builder.clone(); let payload_config = self.config.clone(); - let cell = self.cell.clone(); let cancel = self.cancel.clone(); - let (tx, rx) = oneshot::channel(); - self.build_complete = Some(rx); + let (watch_tx, watch_rx) = watch::channel(None); + self.payload_rx = Some(watch_rx); let cached_reads = self.cached_reads.take().unwrap_or_default(); - self.executor.spawn_blocking_task(Box::pin(async move { + // try_build is not in a blocking task! + // We have to make sure any blocking work is handled individually within payload builder + self.executor.spawn_task(Box::pin(async move { let args = BuildArguments { cached_reads, config: payload_config, cancel, }; - let result = builder.try_build(args, cell); - let _ = tx.send(result); + if let Err(e) = builder.try_build(args, watch_tx).await { + tracing::error!("build task failed: {:?}", e); + } })); } } @@ -367,86 +365,47 @@ where } } -// A future that resolves when a payload becomes available in the BlockCell +/// A future that resolves with the latest payload value, waiting for the first publish if needed. +/// We wrap the inner future in this one to have a concrete type we can easily instantiate it. pub(super) struct ResolvePayload { - future: WaitForValue, -} - -impl ResolvePayload { - pub(super) fn new(future: WaitForValue) -> Self { - Self { future } - } + future: futures_util::future::BoxFuture<'static, Result>, } -impl Future for ResolvePayload { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut().future.poll_unpin(cx) { - Poll::Ready(value) => Poll::Ready(Ok(value)), - Poll::Pending => Poll::Pending, - } - } -} +impl ResolvePayload { + fn new(payload_rx: Option>>, cancel: CancellationToken) -> Self { + let future = async move { + let Some(mut rx) = payload_rx else { + return Err(PayloadBuilderError::Other( + "payload receiver missing".into(), + )); + }; -#[derive(Clone)] -pub(super) struct BlockCell { - inner: Arc>>, - notify: Arc, -} + loop { + if let Some(payload) = rx.borrow().clone() { + cancel.cancel(); + return Ok(payload); + } -impl BlockCell { - pub(super) fn new() -> Self { - Self { - inner: Arc::new(Mutex::new(None)), - notify: Arc::new(Notify::new()), + rx.changed().await.map_err(|_| { + PayloadBuilderError::Other("builder exited before producing payload".into()) + })?; + } } - } + .boxed(); - pub(super) fn set(&self, value: T) { - let mut inner = self.inner.lock().unwrap(); - *inner = Some(value); - self.notify.notify_one(); - } - - pub(super) fn get(&self) -> Option { - let inner = self.inner.lock().unwrap(); - inner.clone() - } - - // Return a future that resolves when value is set - pub(super) fn wait_for_value(&self) -> WaitForValue { - WaitForValue { cell: self.clone() } + Self { future } } } -#[derive(Clone)] -// Future that resolves when a value is set in BlockCell -pub(super) struct WaitForValue { - cell: BlockCell, -} - -impl Future for WaitForValue { - type Output = T; +impl Future for ResolvePayload { + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(value) = self.cell.get() { - Poll::Ready(value) - } else { - // Instead of register, we use notified() to get a future - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} - -impl Default for BlockCell { - fn default() -> Self { - Self::new() + self.get_mut().future.as_mut().poll(cx) } } -fn job_deadline(unix_timestamp_secs: u64) -> std::time::Duration { +fn job_deadline(unix_timestamp_secs: u64) -> Duration { let unix_now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -468,8 +427,9 @@ mod tests { use super::*; use alloy_eips::eip7685::Requests; use alloy_primitives::U256; + use futures_util::future::BoxFuture; use rand::rng; - use reth::tasks::TokioTaskExecutor; + use reth::tasks::{TaskSpawner, TokioTaskExecutor}; use reth_node_api::NodePrimitives; use reth_optimism_payload_builder::{OpPayloadPrimitives, payload::OpPayloadBuilderAttributes}; use reth_optimism_primitives::OpPrimitives; @@ -477,76 +437,54 @@ mod tests { use reth_primitives::SealedBlock; use reth_provider::test_utils::MockEthProvider; use reth_testing_utils::generators::{BlockRangeParams, random_block_range}; + use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::{ - task, - time::{Duration, sleep}, + task::JoinHandle, + time::{Duration, sleep, timeout}, }; - #[tokio::test] - async fn test_block_cell_wait_for_value() { - let cell = BlockCell::new(); - - // Spawn a task that will set the value after a delay - let cell_clone = cell.clone(); - task::spawn(async move { - sleep(Duration::from_millis(100)).await; - cell_clone.set(42); - }); - - // Wait for the value and verify - let wait_future = cell.wait_for_value(); - let result = wait_future.await; - assert_eq!(result, 42); + #[derive(Debug, Clone, Default)] + struct CountingTaskExecutor { + spawn_calls: Arc, + spawn_blocking_calls: Arc, } - #[tokio::test] - async fn test_block_cell_immediate_value() { - let cell = BlockCell::new(); - cell.set(42); - - // Value should be immediately available - let wait_future = cell.wait_for_value(); - let result = wait_future.await; - assert_eq!(result, 42); - } - - #[tokio::test] - async fn test_block_cell_multiple_waiters() { - let cell = BlockCell::new(); - - // Spawn multiple waiters - let wait1 = task::spawn({ - let cell = cell.clone(); - async move { cell.wait_for_value().await } - }); - - let wait2 = task::spawn({ - let cell = cell.clone(); - async move { cell.wait_for_value().await } - }); - - // Set value after a delay - sleep(Duration::from_millis(100)).await; - cell.set(42); + impl CountingTaskExecutor { + fn spawn_calls(&self) -> usize { + self.spawn_calls.load(Ordering::Relaxed) + } - // All waiters should receive the value - assert_eq!(wait1.await.unwrap(), 42); - assert_eq!(wait2.await.unwrap(), 42); + fn spawn_blocking_calls(&self) -> usize { + self.spawn_blocking_calls.load(Ordering::Relaxed) + } } - #[tokio::test] - async fn test_block_cell_update_value() { - let cell = BlockCell::new(); + impl TaskSpawner for CountingTaskExecutor { + fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + self.spawn_calls.fetch_add(1, Ordering::Relaxed); + tokio::task::spawn(fut) + } - // Set initial value - cell.set(42); + fn spawn_critical_task( + &self, + _name: &'static str, + fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + self.spawn_task(fut) + } - // Set new value - cell.set(43); + fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + self.spawn_blocking_calls.fetch_add(1, Ordering::Relaxed); + tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) + } - // Waiter should get the latest value - let result = cell.wait_for_value().await; - assert_eq!(result, 43); + fn spawn_critical_blocking_task( + &self, + _name: &'static str, + fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + self.spawn_blocking_task(fut) + } } #[derive(Debug, Clone)] @@ -574,8 +512,8 @@ mod tests { } } - #[derive(Clone, Debug, Default)] - struct MockPayload; + #[derive(Clone, Debug, Default, PartialEq, Eq)] + struct MockPayload(u64); impl BuiltPayload for MockPayload { type Primitives = OpPrimitives; @@ -614,22 +552,17 @@ mod tests { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = MockPayload; - fn try_build( + async fn try_build( &self, args: BuildArguments, - _best_payload: BlockCell, + best_payload_tx: watch::Sender>, ) -> Result<(), PayloadBuilderError> { self.new_event(BlockEvent::Started); + best_payload_tx.send_replace(Some(MockPayload(1))); - loop { - if args.cancel.is_cancelled() { - self.new_event(BlockEvent::Cancelled); - return Ok(()); - } - - // Small sleep to prevent tight loop - std::thread::sleep(Duration::from_millis(10)); - } + args.cancel.cancelled().await; + self.new_event(BlockEvent::Cancelled); + Ok(()) } } @@ -682,7 +615,7 @@ mod tests { config, builder.clone(), false, - std::time::Duration::from_secs(1), + Duration::from_secs(1), ); // this is not nice but necessary @@ -714,4 +647,117 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_spawn_build_job_uses_async_executor() -> eyre::Result<()> { + let mut rng = rng(); + + let client = MockEthProvider::default(); + let executor = CountingTaskExecutor::default(); + let config = BasicPayloadJobGeneratorConfig::default(); + let builder = MockBuilder::::new(); + + let blocks = random_block_range( + &mut rng, + 1..=1, + BlockRangeParams { + tx_count: 0..1, + ..Default::default() + }, + ); + client.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal()))); + + let generator = BlockPayloadJobGenerator::with_builder( + client.clone(), + executor.clone(), + config, + builder, + false, + Duration::from_secs(1), + ); + + let mut attr = OpPayloadBuilderAttributes::default(); + attr.payload_attributes.parent = client.latest_header()?.unwrap().hash(); + + let mut job = generator.new_payload_job(attr)?; + + assert_eq!(executor.spawn_calls(), 1); + assert_eq!(executor.spawn_blocking_calls(), 0); + + let _ = job.resolve(); + let _ = job.await; + + Ok(()) + } + + #[tokio::test] + async fn test_resolve_payload_waits_for_first_value() { + let (tx, rx) = watch::channel::>(None); + let cancel = CancellationToken::new(); + let resolve = ResolvePayload::new(Some(rx), cancel.clone()); + + tokio::spawn(async move { + sleep(Duration::from_millis(50)).await; + tx.send_replace(Some(MockPayload(7))); + }); + + let payload = timeout(Duration::from_secs(1), resolve) + .await + .expect("resolve should complete") + .expect("resolve should return payload"); + assert_eq!(payload, MockPayload(7)); + assert!(cancel.is_cancelled()); + } + + #[tokio::test] + async fn test_resolve_payload_returns_latest_value() { + let (tx, rx) = watch::channel::>(None); + tx.send_replace(Some(MockPayload(1))); + tx.send_replace(Some(MockPayload(2))); + + let cancel = CancellationToken::new(); + let payload = ResolvePayload::new(Some(rx), cancel.clone()) + .await + .expect("resolve should return payload"); + + assert_eq!(payload, MockPayload(2)); + assert!(cancel.is_cancelled()); + } + + #[tokio::test] + async fn test_resolve_payload_errors_if_builder_exits_without_payload() { + let (tx, rx) = watch::channel::>(None); + drop(tx); + + let _ = ResolvePayload::new(Some(rx), CancellationToken::new()) + .await + .expect_err("resolve should error when sender closes before value"); + } + + #[tokio::test] + async fn test_resolve_payload_errors_if_receiver_missing() { + let _ = ResolvePayload::::new(None, CancellationToken::new()) + .await + .expect_err("resolve should error when receiver is missing"); + } + + #[tokio::test] + async fn test_resolve_payload_cancels_after_payload_arrives() { + let (tx, rx) = watch::channel::>(None); + let cancel = CancellationToken::new(); + let handle = tokio::spawn(ResolvePayload::new(Some(rx), cancel.clone())); + + sleep(Duration::from_millis(20)).await; + assert!(!cancel.is_cancelled()); + + tx.send_replace(Some(MockPayload(9))); + let payload = timeout(Duration::from_secs(1), handle) + .await + .expect("task should finish") + .expect("task should not panic") + .expect("resolve should return payload"); + + assert_eq!(payload, MockPayload(9)); + assert!(cancel.is_cancelled()); + } } diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 6ad3cc5e..2fec4d01 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -3,10 +3,10 @@ use crate::{ backrun_bundle::BackrunBundlesPayloadCtx, builder::{ BuilderConfig, - best_txs::BestFlashblocksTxs, + best_txs::{FlashblockCommittedTxs, FlashblockPoolTxCursor}, builder_tx::BuilderTransactions, context::OpPayloadBuilderCtx, - generator::{BlockCell, BuildArguments, PayloadBuilder}, + generator::{BuildArguments, PayloadBuilder}, timing::FlashblockScheduler, }, gas_limiter::AddressGasLimiter, @@ -26,7 +26,7 @@ use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, }; -use reth::payload::PayloadBuilderAttributes; +use reth::{payload::PayloadBuilderAttributes, tasks::TaskSpawner}; use reth_basic_payload_builder::BuildOutcome; use reth_chainspec::EthChainSpec; use reth_evm::{ConfigureEvm, execute::BlockBuilder}; @@ -48,8 +48,8 @@ use reth_revm::{ use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, TrieInput, updates::TrieUpdates}; use revm::Database; -use std::{collections::BTreeMap, sync::Arc, time::Instant}; -use tokio::sync::mpsc; +use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; @@ -71,7 +71,8 @@ fn convert_receipt(receipt: &OpReceipt) -> op_alloy_consensus::OpReceipt { } } -type NextBestFlashblocksTxs = BestFlashblocksTxs< +type NextFlashblockPoolTxCursor<'a, Pool> = FlashblockPoolTxCursor< + 'a, ::Transaction, Box< dyn reth_transaction_pool::BestTransactions< @@ -114,6 +115,26 @@ pub(super) struct FlashblocksState { prev_trie_updates: Option>, } +struct FallbackBuildOutput { + ctx: OpPayloadBuilderCtx, + info: ExecutionInfo, + payload: OpBuiltPayload, + fb_payload: OpFlashblockPayload, + cache: Cache, + transition: Transition, + fb_state: FlashblocksState, +} + +struct FlashblockBuildOutput { + ctx: OpPayloadBuilderCtx, + build_result: eyre::Result>, + cache: Cache, + transition: Transition, + committed_txs: FlashblockCommittedTxs, + info: ExecutionInfo, + fb_state: FlashblocksState, +} + impl FlashblocksState { fn new( target_flashblock_count: u64, @@ -234,36 +255,59 @@ impl FlashblocksState { // Flashblocks-specific helper methods moved to FlashblocksState /// Optimism's payload builder -#[derive(Debug, Clone)] -pub(super) struct OpPayloadBuilder { +#[derive(Debug)] +pub(super) struct OpPayloadBuilder { + inner: Arc>, +} + +#[derive(Debug)] +pub(super) struct OpPayloadBuilderInner { /// The type responsible for creating the evm. - pub evm_config: OpEvmConfig, + evm_config: OpEvmConfig, /// The transaction pool - pub pool: Pool, + pool: Pool, /// Node client - pub client: Client, + client: Client, /// Sender for sending built flashblock payloads to [`PayloadHandler`], /// which broadcasts outgoing flashblock payloads via p2p. - pub built_fb_payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, /// Sender for sending built full block payloads to [`PayloadHandler`], /// which updates the engine tree state. - pub built_payload_tx: mpsc::Sender, + built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. - pub ws_pub: Arc, + ws_pub: Arc, /// System configuration for the builder - pub config: BuilderConfig, + config: BuilderConfig, /// The metrics for the builder - pub metrics: Arc, + metrics: Arc, /// The end of builder transaction type - pub builder_tx: BuilderTx, + builder_tx: BuilderTx, /// Rate limiting based on gas. This is an optional feature. - pub address_gas_limiter: AddressGasLimiter, + address_gas_limiter: AddressGasLimiter, /// Tokio task metrics for monitoring spawned tasks - pub task_metrics: Arc, + task_metrics: Arc, + /// Task executor used to offload blocking work. + executor: Tasks, +} + +impl Deref for OpPayloadBuilder { + type Target = OpPayloadBuilderInner; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl Clone for OpPayloadBuilder { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } } -impl OpPayloadBuilder { +impl OpPayloadBuilder { #[expect(clippy::too_many_arguments)] pub(super) fn new( evm_config: OpEvmConfig, @@ -276,30 +320,35 @@ impl OpPayloadBuilder { ws_pub: Arc, metrics: Arc, task_metrics: Arc, + executor: Tasks, ) -> Self { let address_gas_limiter = AddressGasLimiter::new(config.gas_limiter_config.clone()); Self { - evm_config, - pool, - client, - built_fb_payload_tx, - built_payload_tx, - ws_pub, - config, - metrics, - builder_tx, - address_gas_limiter, - task_metrics, + inner: Arc::new(OpPayloadBuilderInner { + evm_config, + pool, + client, + built_fb_payload_tx, + built_payload_tx, + ws_pub, + config, + metrics, + builder_tx, + address_gas_limiter, + task_metrics, + executor, + }), } } } -impl reth_basic_payload_builder::PayloadBuilder - for OpPayloadBuilder +impl reth_basic_payload_builder::PayloadBuilder + for OpPayloadBuilder where - Pool: Clone + Send + Sync, - Client: Clone + Send + Sync, - BuilderTx: Clone + Send + Sync, + Pool: Clone + Send + Sync + 'static, + Client: Clone + Send + Sync + 'static, + BuilderTx: Send + Sync + 'static, + Tasks: Clone + Send + Sync + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; @@ -322,12 +371,28 @@ where } } -impl OpPayloadBuilder +impl OpPayloadBuilder where - Pool: PoolBounds, - Client: ClientBounds, - BuilderTx: BuilderTransactions + Send + Sync, + Pool: PoolBounds + 'static, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + Send + Sync + 'static, + Tasks: TaskSpawner + Clone + Send + Sync + 'static, { + /// Helper to spawn a blocking task that returns T in a oneshot channel + async fn run_blocking_task(&self, task: F) -> Result + where + T: Send + 'static, + F: FnOnce() -> Result + Send + 'static, + { + let (tx, rx) = oneshot::channel(); + self.executor.spawn_blocking_task(Box::pin(async move { + let _ = tx.send(task()); + })); + + rx.await + .map_err(|_| PayloadBuilderError::Other("blocking task dropped".into()))? + } + fn get_op_payload_builder_ctx( &self, config: reth_basic_payload_builder::PayloadConfig< @@ -406,14 +471,14 @@ where /// Given build arguments including an Optimism client, transaction pool, /// and configuration, this function creates a transaction payload. Returns /// a result indicating success with the payload or an error in case of failure. - fn build_payload( + async fn build_payload( &self, args: BuildArguments, OpBuiltPayload>, - best_payload: BlockCell, + best_payload_tx: watch::Sender>, ) -> Result<(), PayloadBuilderError> { let block_build_start_time = Instant::now(); let BuildArguments { - mut cached_reads, + cached_reads, config, cancel: block_cancel, } = args; @@ -442,48 +507,82 @@ where enable_incremental_state_root, ); - let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?; - let db = StateProviderDatabase::new(&state_provider); self.address_gas_limiter.refresh(ctx.block_number()); - // 1. execute the pre steps and seal an early block with that - let sequencer_tx_start_time = Instant::now(); - let mut state = State::builder() - .with_database(cached_reads.as_db_mut(db)) - .with_bundle_update() - .build(); - - let mut info = execute_pre_steps(&mut state, &ctx)?; - let sequencer_tx_time = sequencer_tx_start_time.elapsed(); - ctx.metrics.sequencer_tx_duration.record(sequencer_tx_time); - ctx.metrics.sequencer_tx_gauge.set(sequencer_tx_time); - - // We add first builder tx right after deposits - if !ctx.attributes().no_tx_pool - && let Err(e) = self.builder_tx.add_builder_txs( - &state_provider, - &mut info, - &ctx, - &mut state, - false, - fb_state.is_first_flashblock(), - fb_state.is_last_flashblock(), - ) - { - error!( - target: "payload_builder", - "Error adding builder txs to fallback block: {}", - e - ); - }; + // Phase 1: Build the fallback block. + let FallbackBuildOutput { + ctx, + mut info, + payload, + fb_payload, + mut cache, + mut transition, + fb_state: returned_fb_state, + } = self + .run_blocking_task({ + let builder = self.clone(); + let ctx = ctx; + let mut fb_state = fb_state; + move || { + let state_provider = builder.client.state_by_block_hash(ctx.parent().hash())?; + let db = StateProviderDatabase::new(&state_provider); + + // 1. execute the pre steps and seal an early block with that + let sequencer_tx_start_time = Instant::now(); + let mut cached_reads = cached_reads; + let mut state = State::builder() + .with_database(cached_reads.as_db_mut(db)) + .with_bundle_update() + .build(); + + let mut info = execute_pre_steps(&mut state, &ctx)?; + let sequencer_tx_time = sequencer_tx_start_time.elapsed(); + ctx.metrics.sequencer_tx_duration.record(sequencer_tx_time); + ctx.metrics.sequencer_tx_gauge.set(sequencer_tx_time); + + // We add first builder tx right after deposits + if !ctx.attributes().no_tx_pool + && let Err(e) = builder.builder_tx.add_builder_txs( + &state_provider, + &mut info, + &ctx, + &mut state, + false, + fb_state.is_first_flashblock(), + fb_state.is_last_flashblock(), + ) + { + error!( + target: "payload_builder", + "Error adding builder txs to fallback block: {}", + e + ); + }; - let (payload, fb_payload) = build_block( - &mut state, - &ctx, - Some(&mut fb_state), - &mut info, - !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync - )?; + let (payload, fb_payload) = build_block( + &mut state, + &ctx, + Some(&mut fb_state), + &mut info, + !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync + )?; + + // we can safely take from state as we drop it at the end of the scope + let cache = std::mem::take(&mut state.cache); + let transition = state.transition_state.take(); + Ok(FallbackBuildOutput { + ctx, + info, + payload, + fb_payload, + cache, + transition, + fb_state, + }) + } + }) + .await?; + fb_state = returned_fb_state; self.built_fb_payload_tx .try_send(payload.clone()) @@ -495,7 +594,7 @@ where "Failed to send updated payload" ); } - best_payload.set(payload); + best_payload_tx.send_replace(Some(payload)); info!( target: "payload_builder", @@ -606,13 +705,7 @@ where .get_op_payload_builder_ctx(config, fb_cancel.clone()) .map_err(|e| PayloadBuilderError::Other(e.into()))?; - // Create best_transaction iterator - let mut best_txs = BestFlashblocksTxs::new(BestPayloadTransactions::new( - self.pool - .best_transactions_with_attributes(ctx.best_transaction_attributes()), - )); - - let (tx, rx) = std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); + let (tx, mut rx) = mpsc::channel((expected_flashblocks + 1) as usize); tokio::spawn( self.task_metrics .flashblock_timer @@ -624,23 +717,29 @@ where )), ); - // Process flashblocks - block on async channel receive + // State data was extracted in Phase 1 block scope above. + // We carry (CacheState, Option) between iterations + // and reconstruct State inside each sync scope. + let mut committed_txs = FlashblockCommittedTxs::default(); + let parent_hash = ctx.parent_hash(); + + // Process flashblocks - async channel receive loop { // Wait for signal before building flashblock. - if let Ok(new_fb_cancel) = rx.recv() { - debug!( - target: "payload_builder", - id = %fb_payload.payload_id, - flashblock_index = fb_state.flashblock_index(), - block_number = ctx.block_number(), - "Received signal to build flashblock", - ); - ctx = ctx.with_cancel(new_fb_cancel); - } else { + let Some(new_fb_cancel) = rx.recv().await else { // Channel closed - block building cancelled self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); return Ok(()); - } + }; + + debug!( + target: "payload_builder", + id = %fb_payload.payload_id, + flashblock_index = fb_state.flashblock_index(), + block_number = ctx.block_number(), + "Received signal to build flashblock", + ); + ctx = ctx.with_cancel(new_fb_cancel); let fb_span = if span.is_none() { tracing::Span::none() @@ -651,20 +750,78 @@ where "build_flashblock", ) }; - let _entered = fb_span.enter(); - - // Build flashblock after receiving signal - let next_flashblock_state = match self.build_next_flashblock( - &ctx, - &mut fb_state, - &mut info, - &mut state, - &state_provider, - &mut best_txs, - &block_cancel, - &best_payload, - ) { - Ok(Some(next_flashblock_state)) => next_flashblock_state, + let FlashblockBuildOutput { + ctx: returned_ctx, + build_result, + cache: new_cache, + transition: new_transition, + committed_txs: new_committed, + info: new_info, + fb_state: returned_fb_state, + } = tracing::Instrument::instrument( + self.run_blocking_task({ + let builder = self.clone(); + let ctx = ctx; + let block_cancel = block_cancel.clone(); + let info = info; + let cache = cache; + let transition = transition; + let mut committed_txs = committed_txs; + let fb_state = fb_state; + move || { + // reconstruct state + let state_provider = builder.client.state_by_block_hash(parent_hash)?; + let mut state = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(cache) + .with_bundle_update() + .build(); + state.transition_state = transition; + + let mut best_txs = FlashblockPoolTxCursor::new(&mut committed_txs); + + let mut info = info; + let mut fb_state = fb_state; + let result = builder.build_next_flashblock( + &ctx, + &mut fb_state, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + ); + + let cache = std::mem::take(&mut state.cache); + let transition_state = state.transition_state.take(); + + Ok(FlashblockBuildOutput { + ctx, + build_result: result, + cache, + transition: transition_state, + committed_txs, + info, + fb_state, + }) + } + }), + fb_span, + ) + .await?; + + ctx = returned_ctx; + fb_state = returned_fb_state; + info = new_info; + cache = new_cache; + transition = new_transition; + committed_txs = new_committed; + + let next_flashblock_state = match build_result { + Ok(Some((next_flashblock_state, new_payload))) => { + best_payload_tx.send_replace(Some(new_payload)); + next_flashblock_state + } Ok(None) => { self.record_flashblocks_metrics( &ctx, @@ -694,6 +851,7 @@ where #[expect(clippy::too_many_arguments)] fn build_next_flashblock< + 'a, DB: Database + std::fmt::Debug + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, >( @@ -703,10 +861,9 @@ where info: &mut ExecutionInfo, state: &mut State, state_provider: impl reth::providers::StateProvider + Clone, - best_txs: &mut NextBestFlashblocksTxs, + best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>, block_cancel: &CancellationToken, - best_payload: &BlockCell, - ) -> eyre::Result> { + ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch(); let mut target_da_for_batch = fb_state.target_da_for_batch(); @@ -821,8 +978,8 @@ where .slice_new_transactions(&info.executed_transactions) .iter() .map(|tx| tx.tx_hash()) - .collect(); - best_txs.mark_commited(new_transactions); + .collect::>(); + best_txs.mark_committed(new_transactions); // We got block cancelled, we won't need anything from the block at this point // Caution: this assume that block cancel token only cancelled when new FCU is received @@ -895,8 +1052,6 @@ where "Failed to send updated payload" ); } - best_payload.set(new_payload); - // Record flashblock build duration ctx.metrics .flashblock_build_duration @@ -946,7 +1101,7 @@ where "Flashblock built" ); - Ok(Some(next_flashblock_state)) + Ok(Some((next_flashblock_state, new_payload))) } } } @@ -991,19 +1146,21 @@ where } #[async_trait::async_trait] -impl PayloadBuilder for OpPayloadBuilder +impl PayloadBuilder + for OpPayloadBuilder where - Pool: PoolBounds, - Client: ClientBounds, - BuilderTx: BuilderTransactions + Clone + Send + Sync, + Pool: PoolBounds + 'static, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + Send + Sync + 'static, + Tasks: TaskSpawner + Clone + Send + Sync + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; - fn try_build( + async fn try_build( &self, args: BuildArguments, - best_payload: BlockCell, + best_payload_tx: watch::Sender>, ) -> Result<(), PayloadBuilderError> { let span = if cfg!(feature = "telemetry") && args @@ -1016,8 +1173,7 @@ where } else { tracing::Span::none() }; - let _entered = span.enter(); - self.build_payload(args, best_payload) + tracing::Instrument::instrument(self.build_payload(args, best_payload_tx), span).await } } diff --git a/crates/op-rbuilder/src/builder/service.rs b/crates/op-rbuilder/src/builder/service.rs index 552fba9d..b1b4fafe 100644 --- a/crates/op-rbuilder/src/builder/service.rs +++ b/crates/op-rbuilder/src/builder/service.rs @@ -128,6 +128,7 @@ impl FlashblocksServiceBuilder { ws_pub.clone(), metrics.clone(), task_metrics.clone(), + ctx.task_executor().clone(), ); let payload_job_config = BasicPayloadJobGeneratorConfig::default(); diff --git a/crates/op-rbuilder/src/builder/timing.rs b/crates/op-rbuilder/src/builder/timing.rs index e892843f..dde31667 100644 --- a/crates/op-rbuilder/src/builder/timing.rs +++ b/crates/op-rbuilder/src/builder/timing.rs @@ -1,7 +1,8 @@ use core::time::Duration; -use std::{ops::Rem, sync::mpsc::SyncSender}; +use std::ops::Rem; use reth_payload_builder::PayloadId; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; @@ -61,7 +62,7 @@ impl FlashblockScheduler { /// Runs the scheduler, sending flashblock triggers at the scheduled times. pub(super) async fn run( self, - tx: SyncSender, + tx: mpsc::Sender, block_cancel: CancellationToken, mut fb_cancel: CancellationToken, payload_id: PayloadId, @@ -89,7 +90,7 @@ impl FlashblockScheduler { "Sending flashblock trigger" ); - if tx.send(fb_cancel.clone()).is_err() { + if tx.send(fb_cancel.clone()).await.is_err() { // receiver channel was dropped, return. this will only // happen if the `build_payload` function returns, due // to payload building error or the main cancellation From 5efd0cc0208f80b9a0b413c476b159b4b981c1b9 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:03:40 +0800 Subject: [PATCH 2/7] feat: structured cancellation, select! state machine, tracing spans - PayloadJobCancellation replaces single CancellationToken with new_fcu/resolved/deadline/any tokens for deterministic cancellation - Flashblock loop refactored to biased select! state machine - Resolved gate prevents publishing flashblocks after getPayload - Tracing spans: build_fallback, build_flashblock (with index/tx_count/ gas_used), execute_pool_txs, seal_block, state_root - New metrics: payload_job_cancellation_{resolved,new_fcu,deadline, complete,error}, flashblock_publish_suppressed_total - CancellationReason enum with typed reason values --- .../op-rbuilder/src/builder/cancellation.rs | 211 ++++++++++++++++++ crates/op-rbuilder/src/builder/context.rs | 1 + crates/op-rbuilder/src/builder/generator.rs | 86 +++---- crates/op-rbuilder/src/builder/mod.rs | 6 +- crates/op-rbuilder/src/builder/payload.rs | 167 +++++++++++--- crates/op-rbuilder/src/metrics.rs | 12 + 6 files changed, 410 insertions(+), 73 deletions(-) create mode 100644 crates/op-rbuilder/src/builder/cancellation.rs diff --git a/crates/op-rbuilder/src/builder/cancellation.rs b/crates/op-rbuilder/src/builder/cancellation.rs new file mode 100644 index 00000000..8edc8fc2 --- /dev/null +++ b/crates/op-rbuilder/src/builder/cancellation.rs @@ -0,0 +1,211 @@ +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; + +/// Structured cancellation for a single payload building job. +/// +/// Three distinct tokens that can distinguish *why* building was stopped: +/// - `new_fcu`: A new FCU arrived (`ensure_only_one_payload`). Abandon all work. +/// - `resolved`: `getPayload` was called. Stop building, don't publish new flashblocks. +/// - `deadline`: The payload job deadline was reached. Stop all work. +/// +/// `any` fires when ANY of the above fires. It is used for tasks that should stop regardless of cancellation reason. +/// +/// Use `cancel_new_fcu()`, `cancel_resolved()`, `cancel_deadline()` to fire +/// a specific source, these also cancel `any` automatically. +/// +/// These fields must remains private to enforce the invariant that `any` is always canceled +/// alongside any specific token. Use accessor methods to read token state. +#[derive(Clone)] +pub(crate) struct PayloadJobCancellation { + new_fcu: CancellationToken, + resolved: CancellationToken, + deadline: CancellationToken, + any: CancellationToken, +} + +/// Why a payload job was canceled. +#[derive(Debug, Clone, Copy)] +pub(crate) enum CancellationReason { + Resolved, + NewFcu, + Deadline, + /// Job completed normally (all scheduled flashblocks built before resolve/fcu). + Complete, +} + +impl CancellationReason { + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Resolved => "resolved", + Self::NewFcu => "new_fcu", + Self::Deadline => "deadline", + Self::Complete => "complete", + } + } +} + +impl PayloadJobCancellation { + /// Creates a new `PayloadJobCancellation` with all tokens uncancelled. + pub(crate) fn new() -> Self { + Self { + new_fcu: CancellationToken::new(), + resolved: CancellationToken::new(), + deadline: CancellationToken::new(), + any: CancellationToken::new(), + } + } + + /// Fires `new_fcu` token and `any`. + pub(crate) fn cancel_new_fcu(&self) { + self.new_fcu.cancel(); + self.any.cancel(); + } + + /// Fires `resolved` token and `any`. + pub(crate) fn cancel_resolved(&self) { + self.resolved.cancel(); + self.any.cancel(); + } + + /// Fires `deadline` token and `any`. + pub(crate) fn cancel_deadline(&self) { + self.deadline.cancel(); + self.any.cancel(); + } + + /// Returns true if any cancellation source has fired. + pub(crate) fn is_cancelled(&self) -> bool { + self.any.is_cancelled() + } + + /// Returns true if `resolved` specifically was cancelled. + pub(crate) fn is_resolved(&self) -> bool { + self.resolved.is_cancelled() + } + + /// Returns true if `new_fcu` specifically was cancelled. + pub(crate) fn is_new_fcu(&self) -> bool { + self.new_fcu.is_cancelled() + } + + /// Future that resolves when `resolved` is cancelled. + pub(crate) fn resolved_cancelled(&self) -> WaitForCancellationFuture<'_> { + self.resolved.cancelled() + } + + /// Future that resolves when `new_fcu` is cancelled. + pub(crate) fn new_fcu_cancelled(&self) -> WaitForCancellationFuture<'_> { + self.new_fcu.cancelled() + } + + /// Returns the `any` token. + /// Passed to blocking tasks and the scheduler. + pub(crate) fn any_token(&self) -> CancellationToken { + self.any.clone() + } + + /// Creates a child token of `any`. + /// Useful for per-flashblock cancellation. + #[allow(dead_code)] + pub(crate) fn child_token(&self) -> CancellationToken { + self.any.child_token() + } + + /// Returns the reason this job was canceled, or `Complete` if not canceled. + pub(crate) fn reason(&self) -> CancellationReason { + if self.resolved.is_cancelled() { + CancellationReason::Resolved + } else if self.new_fcu.is_cancelled() { + CancellationReason::NewFcu + } else if self.deadline.is_cancelled() { + CancellationReason::Deadline + } else { + CancellationReason::Complete + } + } +} + +impl Default for PayloadJobCancellation { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for PayloadJobCancellation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PayloadJobCancellation") + .field("new_fcu", &self.new_fcu.is_cancelled()) + .field("resolved", &self.resolved.is_cancelled()) + .field("deadline", &self.deadline.is_cancelled()) + .field("any", &self.any.is_cancelled()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{Duration, timeout}; + + #[tokio::test] + async fn test_cancel_new_fcu_fires_any() { + let cancel = PayloadJobCancellation::new(); + assert!(!cancel.is_cancelled()); + + cancel.cancel_new_fcu(); + assert!(cancel.is_cancelled()); + assert!(cancel.is_new_fcu()); + assert!(!cancel.is_resolved()); + assert!(matches!(cancel.reason(), CancellationReason::NewFcu)); + } + + #[tokio::test] + async fn test_cancel_resolved_fires_any() { + let cancel = PayloadJobCancellation::new(); + cancel.cancel_resolved(); + assert!(cancel.is_cancelled()); + assert!(cancel.is_resolved()); + assert!(!cancel.is_new_fcu()); + assert!(matches!(cancel.reason(), CancellationReason::Resolved)); + } + + #[tokio::test] + async fn test_cancel_deadline_fires_any() { + let cancel = PayloadJobCancellation::new(); + cancel.cancel_deadline(); + assert!(cancel.is_cancelled()); + assert!(!cancel.is_new_fcu()); + assert!(!cancel.is_resolved()); + assert!(matches!(cancel.reason(), CancellationReason::Deadline)); + } + + #[tokio::test] + async fn test_any_awaitable() { + let cancel = PayloadJobCancellation::new(); + let any = cancel.any_token(); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + cancel.cancel_resolved(); + }); + + timeout(Duration::from_millis(100), any.cancelled()) + .await + .expect("any should fire when resolved fires"); + } + + #[tokio::test] + async fn test_child_token_cancelled_by_any() { + let cancel = PayloadJobCancellation::new(); + let child = cancel.child_token(); + assert!(!child.is_cancelled()); + + cancel.cancel_resolved(); + assert!(child.is_cancelled()); + } + + #[tokio::test] + async fn test_reason_complete_when_not_cancelled() { + let cancel = PayloadJobCancellation::new(); + assert!(matches!(cancel.reason(), CancellationReason::Complete)); + } +} diff --git a/crates/op-rbuilder/src/builder/context.rs b/crates/op-rbuilder/src/builder/context.rs index 1119b050..b1167f3f 100644 --- a/crates/op-rbuilder/src/builder/context.rs +++ b/crates/op-rbuilder/src/builder/context.rs @@ -401,6 +401,7 @@ impl OpPayloadBuilderCtx { /// /// Returns `Ok(Some(())` if the job was cancelled. #[expect(clippy::too_many_arguments)] + #[tracing::instrument(level = "info", name = "execute_pool_txs", skip_all)] pub(super) fn execute_best_transactions( &self, info: &mut ExecutionInfo, diff --git a/crates/op-rbuilder/src/builder/generator.rs b/crates/op-rbuilder/src/builder/generator.rs index 917e9f26..467819d7 100644 --- a/crates/op-rbuilder/src/builder/generator.rs +++ b/crates/op-rbuilder/src/builder/generator.rs @@ -23,9 +23,10 @@ use tokio::{ sync::watch, time::{Duration, Sleep}, }; -use tokio_util::sync::CancellationToken; use tracing::info; +use super::cancellation::PayloadJobCancellation; + /// A trait for building payloads that encapsulate Ethereum transactions. /// /// This trait provides the `try_build` method to construct a transaction payload @@ -75,10 +76,11 @@ pub(super) struct BlockPayloadJobGenerator { builder: Builder, /// Whether to ensure only one payload is being processed at a time ensure_only_one_payload: bool, - /// The last payload being processed - last_payload: Arc>, + /// The last payload's cancellation. + /// `cancel_new_fcu()` is called when a new FCU arrives + last_payload_cancel: Arc>, /// The extra block deadline in seconds - extra_block_deadline: std::time::Duration, + extra_block_deadline: Duration, /// Stored `cached_reads` for new payload jobs. pre_cached: Option, } @@ -94,7 +96,7 @@ impl BlockPayloadJobGenerator { config: BasicPayloadJobGeneratorConfig, builder: Builder, ensure_only_one_payload: bool, - extra_block_deadline: std::time::Duration, + extra_block_deadline: Duration, ) -> Self { Self { client, @@ -102,7 +104,7 @@ impl BlockPayloadJobGenerator { _config: config, builder, ensure_only_one_payload, - last_payload: Arc::new(Mutex::new(CancellationToken::new())), + last_payload_cancel: Arc::new(Mutex::new(PayloadJobCancellation::new())), extra_block_deadline, pre_cached: None, } @@ -139,22 +141,22 @@ where &self, attributes: ::Attributes, ) -> Result { - let cancel_token = if self.ensure_only_one_payload { - // Cancel existing payload + let cancellation = if self.ensure_only_one_payload { + // Cancel existing payload via new_fcu { - let last_payload = self.last_payload.lock().unwrap(); - last_payload.cancel(); + let last_cancel = self.last_payload_cancel.lock().unwrap(); + last_cancel.cancel_new_fcu(); } - // Create and set new cancellation token with a fresh lock - let cancel_token = CancellationToken::new(); + // Create new PayloadJobCancellation and store it + let cancellation = PayloadJobCancellation::new(); { - let mut last_payload = self.last_payload.lock().unwrap(); - *last_payload = cancel_token.clone(); + let mut last_cancel = self.last_payload_cancel.lock().unwrap(); + *last_cancel = cancellation.clone(); } - cancel_token + cancellation } else { - CancellationToken::new() + PayloadJobCancellation::new() }; let parent_header = if attributes.parent().is_zero() { @@ -174,9 +176,8 @@ where // the payload job stops and cannot be queried again. With tight deadlines close // to the block number, we risk reaching the deadline before the node queries the payload. // - // Adding 0.5 seconds as wiggle room since block times are shorter here. - // TODO: A better long-term solution would be to implement cancellation logic - // that cancels existing jobs when receiving new block building requests. + // PayloadJobCancellation fires `deadline` specifically (separate from new_fcu/resolved). + // The leeway is mainly for the batcher avalanche scenario described below. // // When batcher's max channel duration is big enough (e.g. 10m), the // sequencer would send an avalanche of FCUs/getBlockByNumber on @@ -196,7 +197,7 @@ where builder: self.builder.clone(), config, payload_rx: None, - cancel: cancel_token, + cancel: cancellation, deadline, cached_reads: self.maybe_pre_cached(parent_header.hash()), }; @@ -252,8 +253,8 @@ where pub(crate) builder: Builder, /// Receiver for the latest payload from the builder task. pub(crate) payload_rx: Option>>, - /// Cancellation token for the running job - pub(crate) cancel: CancellationToken, + /// Structured cancellation for the running job + pub(crate) cancel: PayloadJobCancellation, pub(crate) deadline: Pin>, // Add deadline /// Caches all disk reads for the state the new payloads builds on /// @@ -288,8 +289,11 @@ where info!("Resolve kind {:?}", kind); let rx = self.payload_rx.take(); - let cancel = self.cancel.clone(); - (ResolvePayload::new(rx, cancel), KeepPayloadJobAlive::No) + let cancellation = self.cancel.clone(); + ( + ResolvePayload::new(rx, cancellation), + KeepPayloadJobAlive::No, + ) } } @@ -298,8 +302,8 @@ pub(super) struct BuildArguments { pub cached_reads: CachedReads, /// How to configure the payload. pub config: PayloadConfig>, - /// A marker that can be used to cancel the job. - pub cancel: CancellationToken, + /// Structured cancellation. + pub cancel: PayloadJobCancellation, } /// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService` @@ -313,7 +317,7 @@ where pub(super) fn spawn_build_job(&mut self) { let builder = self.builder.clone(); let payload_config = self.config.clone(); - let cancel = self.cancel.clone(); + let cancellation = self.cancel.clone(); let (watch_tx, watch_rx) = watch::channel(None); self.payload_rx = Some(watch_rx); @@ -324,7 +328,7 @@ where let args = BuildArguments { cached_reads, config: payload_config, - cancel, + cancel: cancellation, }; if let Err(e) = builder.try_build(args, watch_tx).await { @@ -350,12 +354,12 @@ where // Check if deadline is reached if this.deadline.as_mut().poll(cx).is_ready() { - this.cancel.cancel(); + this.cancel.cancel_deadline(); tracing::debug!("Deadline reached"); return Poll::Ready(Ok(())); } - // If cancelled via resolve_kind() + // If canceled via any source if this.cancel.is_cancelled() { tracing::debug!("Job cancelled"); return Poll::Ready(Ok(())); @@ -372,7 +376,10 @@ pub(super) struct ResolvePayload { } impl ResolvePayload { - fn new(payload_rx: Option>>, cancel: CancellationToken) -> Self { + fn new( + payload_rx: Option>>, + cancellation: PayloadJobCancellation, + ) -> Self { let future = async move { let Some(mut rx) = payload_rx else { return Err(PayloadBuilderError::Other( @@ -382,7 +389,7 @@ impl ResolvePayload { loop { if let Some(payload) = rx.borrow().clone() { - cancel.cancel(); + cancellation.cancel_resolved(); return Ok(payload); } @@ -560,7 +567,7 @@ mod tests { self.new_event(BlockEvent::Started); best_payload_tx.send_replace(Some(MockPayload(1))); - args.cancel.cancelled().await; + args.cancel.any_token().cancelled().await; self.new_event(BlockEvent::Cancelled); Ok(()) } @@ -693,7 +700,7 @@ mod tests { #[tokio::test] async fn test_resolve_payload_waits_for_first_value() { let (tx, rx) = watch::channel::>(None); - let cancel = CancellationToken::new(); + let cancel = PayloadJobCancellation::new(); let resolve = ResolvePayload::new(Some(rx), cancel.clone()); tokio::spawn(async move { @@ -706,6 +713,7 @@ mod tests { .expect("resolve should complete") .expect("resolve should return payload"); assert_eq!(payload, MockPayload(7)); + assert!(cancel.is_resolved()); assert!(cancel.is_cancelled()); } @@ -715,12 +723,13 @@ mod tests { tx.send_replace(Some(MockPayload(1))); tx.send_replace(Some(MockPayload(2))); - let cancel = CancellationToken::new(); + let cancel = PayloadJobCancellation::new(); let payload = ResolvePayload::new(Some(rx), cancel.clone()) .await .expect("resolve should return payload"); assert_eq!(payload, MockPayload(2)); + assert!(cancel.is_resolved()); assert!(cancel.is_cancelled()); } @@ -729,14 +738,14 @@ mod tests { let (tx, rx) = watch::channel::>(None); drop(tx); - let _ = ResolvePayload::new(Some(rx), CancellationToken::new()) + let _ = ResolvePayload::new(Some(rx), PayloadJobCancellation::new()) .await .expect_err("resolve should error when sender closes before value"); } #[tokio::test] async fn test_resolve_payload_errors_if_receiver_missing() { - let _ = ResolvePayload::::new(None, CancellationToken::new()) + let _ = ResolvePayload::::new(None, PayloadJobCancellation::new()) .await .expect_err("resolve should error when receiver is missing"); } @@ -744,7 +753,7 @@ mod tests { #[tokio::test] async fn test_resolve_payload_cancels_after_payload_arrives() { let (tx, rx) = watch::channel::>(None); - let cancel = CancellationToken::new(); + let cancel = PayloadJobCancellation::new(); let handle = tokio::spawn(ResolvePayload::new(Some(rx), cancel.clone())); sleep(Duration::from_millis(20)).await; @@ -758,6 +767,7 @@ mod tests { .expect("resolve should return payload"); assert_eq!(payload, MockPayload(9)); + assert!(cancel.is_resolved()); assert!(cancel.is_cancelled()); } } diff --git a/crates/op-rbuilder/src/builder/mod.rs b/crates/op-rbuilder/src/builder/mod.rs index a8c27c03..342c4a09 100644 --- a/crates/op-rbuilder/src/builder/mod.rs +++ b/crates/op-rbuilder/src/builder/mod.rs @@ -11,6 +11,7 @@ use crate::{ mod best_txs; mod builder_tx; +pub(crate) mod cancellation; mod config; mod context; mod flashblocks_builder_tx; @@ -62,9 +63,8 @@ pub struct BuilderConfig { // the payload job stops and cannot be queried again. With tight deadlines close // to the block number, we risk reaching the deadline before the node queries the payload. // - // Adding 0.5 seconds as wiggle room since block times are shorter here. - // TODO: A better long-term solution would be to implement cancellation logic - // that cancels existing jobs when receiving new block building requests. + // SlotCancellation now distinguishes new_fcu/resolved/deadline, so the leeway + // is mainly for the batcher avalanche scenario described below. // // When batcher's max channel duration is big enough (e.g. 10m), the // sequencer would send an avalanche of FCUs/getBlockByNumber on diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 2fec4d01..9ad6b560 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -480,8 +480,10 @@ where let BuildArguments { cached_reads, config, - cancel: block_cancel, + cancel: cancellation, } = args; + // Use `any` as the general cancellation token + let block_cancel = cancellation.any_token(); // The build_payload span is created and instrumented in try_build() using // tracing::Instrument, which safely manages it across async .await points. @@ -510,6 +512,11 @@ where self.address_gas_limiter.refresh(ctx.block_number()); // Phase 1: Build the fallback block. + let fallback_span = if span.is_none() { + tracing::Span::none() + } else { + span!(parent: &span, Level::INFO, "build_fallback") + }; let FallbackBuildOutput { ctx, mut info, @@ -518,8 +525,8 @@ where mut cache, mut transition, fb_state: returned_fb_state, - } = self - .run_blocking_task({ + } = tracing::Instrument::instrument( + self.run_blocking_task({ let builder = self.clone(); let ctx = ctx; let mut fb_state = fb_state; @@ -580,8 +587,10 @@ where fb_state, }) } - }) - .await?; + }), + fallback_span, + ) + .await?; fb_state = returned_fb_state; self.built_fb_payload_tx @@ -723,13 +732,31 @@ where let mut committed_txs = FlashblockCommittedTxs::default(); let parent_hash = ctx.parent_hash(); - // Process flashblocks - async channel receive + // State machine: explicit select! at every phase for deterministic cancellation. loop { - // Wait for signal before building flashblock. - let Some(new_fb_cancel) = rx.recv().await else { - // Channel closed - block building cancelled - self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); - return Ok(()); + // Phase 1: Wait for scheduler trigger, or exit on cancellation. + let new_fb_cancel = tokio::select! { + // ensures cancellation is checked before trigger. + biased; + _ = cancellation.resolved_cancelled() => { + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); + return Ok(()); + } + _ = cancellation.new_fcu_cancelled() => { + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); + return Ok(()); + } + trigger = rx.recv() => match trigger { + Some(t) => t, + None => { + // Channel closed — scheduler exhausted or canceled + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); + return Ok(()); + } + }, }; debug!( @@ -748,18 +775,30 @@ where parent: &span, Level::INFO, "build_flashblock", + flashblock_index = fb_state.flashblock_index(), + block_number = ctx.block_number(), + tx_count = tracing::field::Empty, + gas_used = tracing::field::Empty, ) }; - let FlashblockBuildOutput { - ctx: returned_ctx, - build_result, - cache: new_cache, - transition: new_transition, - committed_txs: new_committed, - info: new_info, - fb_state: returned_fb_state, - } = tracing::Instrument::instrument( - self.run_blocking_task({ + + // Phase 2: Build flashblock (blocking task), or exit on cancellation. + // Note: ctx, info, cache, transition, committed_txs, fb_state are moved into + // the blocking task closure. If a cancellation branch fires, the blocking task + // is dropped (the thread finishes but the oneshot result is discarded). + let build_output = tokio::select! { + biased; + // Suppressed flashblock: we received getResolve during flashblock building + _ = cancellation.resolved_cancelled() => { + self.metrics.flashblock_publish_suppressed_total.increment(1); + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + return Ok(()); + } + _ = cancellation.new_fcu_cancelled() => { + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + return Ok(()); + } + result = self.run_blocking_task({ let builder = self.clone(); let ctx = ctx; let block_cancel = block_cancel.clone(); @@ -768,7 +807,11 @@ where let transition = transition; let mut committed_txs = committed_txs; let fb_state = fb_state; + let fb_span = fb_span.clone(); move || { + // Enter the flashblock span so child spans are properly parented + let _enter = fb_span.enter(); + // reconstruct state let state_provider = builder.client.state_by_block_hash(parent_hash)?; let mut state = State::builder() @@ -805,10 +848,18 @@ where fb_state, }) } - }), - fb_span, - ) - .await?; + }) => result?, + }; + + let FlashblockBuildOutput { + ctx: returned_ctx, + build_result, + cache: new_cache, + transition: new_transition, + committed_txs: new_committed, + info: new_info, + fb_state: returned_fb_state, + } = build_output; ctx = returned_ctx; fb_state = returned_fb_state; @@ -817,12 +868,29 @@ where transition = new_transition; committed_txs = new_committed; + // Record span attributes now that we have results + fb_span.record("tx_count", info.executed_transactions.len() as u64); + fb_span.record("gas_used", info.cumulative_gas_used); + + // Phase 3: Publish + // no .await between check and publish (structural guarantee). + // If resolved or new_fcu fired during the build, skip publishing. + if cancellation.is_resolved() || cancellation.is_new_fcu() { + if cancellation.is_resolved() { + ctx.metrics.flashblock_publish_suppressed_total.increment(1); + } + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); + return Ok(()); + } + let next_flashblock_state = match build_result { Ok(Some((next_flashblock_state, new_payload))) => { best_payload_tx.send_replace(Some(new_payload)); next_flashblock_state } Ok(None) => { + Self::record_cancellation_reason(&self.metrics, &cancellation, &span); self.record_flashblocks_metrics( &ctx, &fb_state, @@ -833,6 +901,8 @@ where return Ok(()); } Err(err) => { + ctx.metrics.payload_job_cancellation_error.increment(1); + span.record("cancellation_reason", "error"); error!( target: "payload_builder", id = %fb_payload.payload_id, @@ -981,8 +1051,7 @@ where .collect::>(); best_txs.mark_committed(new_transactions); - // We got block cancelled, we won't need anything from the block at this point - // Caution: this assume that block cancel token only cancelled when new FCU is received + // Block cancelled (new FCU, getPayload resolved, or deadline). Skip publishing. if block_cancel.is_cancelled() { return Ok(None); } @@ -1033,8 +1102,8 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload - // To ensure that we will return same blocks as rollup-boost (to leverage caches) + // Block canceled (new FCU, getPayload resolved, or deadline). + // Don't publish to ensures every published flashblock is a subset of the resolved payload. if block_cancel.is_cancelled() { return Ok(None); } @@ -1106,7 +1175,31 @@ where } } - /// Do some logging and metric recording when we stop build flashblocks + /// Records cancellation reason for observability. + fn record_cancellation_reason( + metrics: &OpRBuilderMetrics, + cancellation: &super::cancellation::PayloadJobCancellation, + span: &tracing::Span, + ) { + let reason = cancellation.reason(); + match reason { + super::cancellation::CancellationReason::Resolved => { + metrics.payload_job_cancellation_resolved.increment(1) + } + super::cancellation::CancellationReason::NewFcu => { + metrics.payload_job_cancellation_new_fcu.increment(1) + } + super::cancellation::CancellationReason::Deadline => { + metrics.payload_job_cancellation_deadline.increment(1) + } + super::cancellation::CancellationReason::Complete => { + metrics.payload_job_cancellation_complete.increment(1) + } + } + span.record("cancellation_reason", reason.as_str()); + } + + /// Do some logging and metric recording when we stop building flashblocks fn record_flashblocks_metrics( &self, ctx: &OpPayloadBuilderCtx, @@ -1141,7 +1234,7 @@ where "Flashblocks building complete" ); - span.record("flashblock_count", fb_state.flashblock_index()); + span.record("flashblocks_built", fb_state.flashblock_index()); } } @@ -1169,7 +1262,15 @@ where .number .is_multiple_of(self.config.sampling_ratio) { - span!(Level::INFO, "build_payload") + span!( + Level::INFO, + "build_payload", + payload_id = tracing::field::Empty, + block_number = args.config.parent_header.number + 1, + parent_hash = %args.config.parent_header.hash(), + flashblocks_built = tracing::field::Empty, + cancellation_reason = tracing::field::Empty, + ) } else { tracing::Span::none() }; @@ -1196,6 +1297,7 @@ where Ok(info) } +#[tracing::instrument(level = "info", name = "seal_block", skip_all)] pub(super) fn build_block( state: &mut State, ctx: &OpPayloadBuilderCtx, @@ -1247,6 +1349,7 @@ where let mut trie_updates_to_cache: Option> = None; if calculate_state_root { + let _state_root_span = span!(Level::INFO, "state_root").entered(); let state_provider = state.database.as_ref(); // prev_trie_updates is None for the first flashblock. diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index d89addfa..7739c86c 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -183,6 +183,18 @@ pub struct OpRBuilderMetrics { pub backrun_transaction_processing_duration: Histogram, /// Latest backrun transaction processing duration pub backrun_transaction_processing_gauge: Gauge, + /// Builds completed but not published due to resolved gate + pub flashblock_publish_suppressed_total: Counter, + /// Payload job ended because getPayload resolved + pub payload_job_cancellation_resolved: Counter, + /// Payload job ended because a new FCU arrived + pub payload_job_cancellation_new_fcu: Counter, + /// Payload job ended because the deadline was reached + pub payload_job_cancellation_deadline: Counter, + /// Payload job completed normally (all flashblocks built) + pub payload_job_cancellation_complete: Counter, + /// Payload job ended due to a build error + pub payload_job_cancellation_error: Counter, } impl OpRBuilderMetrics { From 628c35b6eb08dfe5e6195ef63c2ad421ccb4b27f Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:48:55 +0800 Subject: [PATCH 3/7] add payload job cancellation reason log --- crates/op-rbuilder/src/builder/payload.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 9ad6b560..43563e8d 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -1197,6 +1197,11 @@ where } } span.record("cancellation_reason", reason.as_str()); + info!( + target: "payload_builder", + cancellation_reason = reason.as_str(), + "Payload job cancelled" + ); } /// Do some logging and metric recording when we stop building flashblocks From c13537bb37ff699bcdeb2401b8e355b430b3570c Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Fri, 20 Mar 2026 16:20:39 +0800 Subject: [PATCH 4/7] refactor PayloadJobCancellation --- .../op-rbuilder/src/builder/cancellation.rs | 191 +++++++++--------- crates/op-rbuilder/src/builder/generator.rs | 2 +- crates/op-rbuilder/src/builder/payload.rs | 74 ++++--- 3 files changed, 128 insertions(+), 139 deletions(-) diff --git a/crates/op-rbuilder/src/builder/cancellation.rs b/crates/op-rbuilder/src/builder/cancellation.rs index 8edc8fc2..e7fe3a47 100644 --- a/crates/op-rbuilder/src/builder/cancellation.rs +++ b/crates/op-rbuilder/src/builder/cancellation.rs @@ -1,126 +1,115 @@ -use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use std::sync::{ + Arc, + atomic::{AtomicU8, Ordering}, +}; +use tokio_util::sync::CancellationToken; -/// Structured cancellation for a single payload building job. -/// -/// Three distinct tokens that can distinguish *why* building was stopped: -/// - `new_fcu`: A new FCU arrived (`ensure_only_one_payload`). Abandon all work. -/// - `resolved`: `getPayload` was called. Stop building, don't publish new flashblocks. -/// - `deadline`: The payload job deadline was reached. Stop all work. -/// -/// `any` fires when ANY of the above fires. It is used for tasks that should stop regardless of cancellation reason. -/// -/// Use `cancel_new_fcu()`, `cancel_resolved()`, `cancel_deadline()` to fire -/// a specific source, these also cancel `any` automatically. -/// -/// These fields must remains private to enforce the invariant that `any` is always canceled -/// alongside any specific token. Use accessor methods to read token state. -#[derive(Clone)] -pub(crate) struct PayloadJobCancellation { - new_fcu: CancellationToken, - resolved: CancellationToken, - deadline: CancellationToken, - any: CancellationToken, -} +const REASON_NONE: u8 = 0; -/// Why a payload job was canceled. -#[derive(Debug, Clone, Copy)] +/// Why a payload job was cancelled. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] pub(crate) enum CancellationReason { - Resolved, - NewFcu, - Deadline, - /// Job completed normally (all scheduled flashblocks built before resolve/fcu). - Complete, + Resolved = 1, + NewFcu = 2, + Deadline = 3, } impl CancellationReason { - pub(crate) fn as_str(self) -> &'static str { - match self { - Self::Resolved => "resolved", - Self::NewFcu => "new_fcu", - Self::Deadline => "deadline", - Self::Complete => "complete", + fn from_u8(v: u8) -> Option { + match v { + 1 => Some(Self::Resolved), + 2 => Some(Self::NewFcu), + 3 => Some(Self::Deadline), + _ => None, } } } +/// Structured cancellation for a single payload building job. +/// +/// A `CancellationToken` with an atomic reason that records *why* the job was stopped: +/// - `Resolved`: `getPayload` was called. Stop building, don't publish new flashblocks. +/// - `NewFcu`: A new FCU arrived (`ensure_only_one_payload`). Abandon all work. +/// - `Deadline`: The payload job deadline was reached. Stop all work. +/// +/// Use `cancel_resolved()`, `cancel_new_fcu()`, `cancel_deadline()` to fire the token with specific reason. +#[derive(Clone)] +pub(crate) struct PayloadJobCancellation { + token: CancellationToken, + reason: Arc, +} + impl PayloadJobCancellation { - /// Creates a new `PayloadJobCancellation` with all tokens uncancelled. + /// Creates a new `PayloadJobCancellation` with the token uncancelled. pub(crate) fn new() -> Self { Self { - new_fcu: CancellationToken::new(), - resolved: CancellationToken::new(), - deadline: CancellationToken::new(), - any: CancellationToken::new(), + token: CancellationToken::new(), + reason: Arc::new(AtomicU8::new(REASON_NONE)), } } - /// Fires `new_fcu` token and `any`. + fn cancel_with(&self, reason: CancellationReason) { + // First writer wins. If already set, the original reason is preserved. + let _ = self.reason.compare_exchange( + REASON_NONE, + reason as u8, + Ordering::AcqRel, + Ordering::Acquire, + ); + self.token.cancel(); + } + + /// Cancel with `NewFcu` reason. pub(crate) fn cancel_new_fcu(&self) { - self.new_fcu.cancel(); - self.any.cancel(); + self.cancel_with(CancellationReason::NewFcu); } - /// Fires `resolved` token and `any`. + /// Cancel with `Resolved` reason. pub(crate) fn cancel_resolved(&self) { - self.resolved.cancel(); - self.any.cancel(); + self.cancel_with(CancellationReason::Resolved); } - /// Fires `deadline` token and `any`. + /// Cancel with `Deadline` reason. pub(crate) fn cancel_deadline(&self) { - self.deadline.cancel(); - self.any.cancel(); + self.cancel_with(CancellationReason::Deadline); } /// Returns true if any cancellation source has fired. pub(crate) fn is_cancelled(&self) -> bool { - self.any.is_cancelled() + self.token.is_cancelled() } - /// Returns true if `resolved` specifically was cancelled. + /// Returns true if cancelled with `Resolved` reason. pub(crate) fn is_resolved(&self) -> bool { - self.resolved.is_cancelled() + self.reason() == Some(CancellationReason::Resolved) } - /// Returns true if `new_fcu` specifically was cancelled. + /// Returns true if cancelled with `NewFcu` reason. pub(crate) fn is_new_fcu(&self) -> bool { - self.new_fcu.is_cancelled() + self.reason() == Some(CancellationReason::NewFcu) } - /// Future that resolves when `resolved` is cancelled. - pub(crate) fn resolved_cancelled(&self) -> WaitForCancellationFuture<'_> { - self.resolved.cancelled() + /// Future that resolves when cancelled (any reason). + pub(crate) fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> { + self.token.cancelled() } - /// Future that resolves when `new_fcu` is cancelled. - pub(crate) fn new_fcu_cancelled(&self) -> WaitForCancellationFuture<'_> { - self.new_fcu.cancelled() - } - - /// Returns the `any` token. + /// Returns the underlying token. /// Passed to blocking tasks and the scheduler. - pub(crate) fn any_token(&self) -> CancellationToken { - self.any.clone() + pub(crate) fn token(&self) -> CancellationToken { + self.token.clone() } - /// Creates a child token of `any`. + /// Creates a child token. /// Useful for per-flashblock cancellation. - #[allow(dead_code)] pub(crate) fn child_token(&self) -> CancellationToken { - self.any.child_token() - } - - /// Returns the reason this job was canceled, or `Complete` if not canceled. - pub(crate) fn reason(&self) -> CancellationReason { - if self.resolved.is_cancelled() { - CancellationReason::Resolved - } else if self.new_fcu.is_cancelled() { - CancellationReason::NewFcu - } else if self.deadline.is_cancelled() { - CancellationReason::Deadline - } else { - CancellationReason::Complete - } + self.token.child_token() + } + + /// Returns the reason this job was cancelled, or `None` if not cancelled. + pub(crate) fn reason(&self) -> Option { + CancellationReason::from_u8(self.reason.load(Ordering::Acquire)) } } @@ -133,10 +122,8 @@ impl Default for PayloadJobCancellation { impl std::fmt::Debug for PayloadJobCancellation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PayloadJobCancellation") - .field("new_fcu", &self.new_fcu.is_cancelled()) - .field("resolved", &self.resolved.is_cancelled()) - .field("deadline", &self.deadline.is_cancelled()) - .field("any", &self.any.is_cancelled()) + .field("cancelled", &self.token.is_cancelled()) + .field("reason", &self.reason()) .finish() } } @@ -147,7 +134,7 @@ mod tests { use tokio::time::{Duration, timeout}; #[tokio::test] - async fn test_cancel_new_fcu_fires_any() { + async fn test_cancel_new_fcu() { let cancel = PayloadJobCancellation::new(); assert!(!cancel.is_cancelled()); @@ -155,46 +142,46 @@ mod tests { assert!(cancel.is_cancelled()); assert!(cancel.is_new_fcu()); assert!(!cancel.is_resolved()); - assert!(matches!(cancel.reason(), CancellationReason::NewFcu)); + assert_eq!(cancel.reason(), Some(CancellationReason::NewFcu)); } #[tokio::test] - async fn test_cancel_resolved_fires_any() { + async fn test_cancel_resolved() { let cancel = PayloadJobCancellation::new(); cancel.cancel_resolved(); assert!(cancel.is_cancelled()); assert!(cancel.is_resolved()); assert!(!cancel.is_new_fcu()); - assert!(matches!(cancel.reason(), CancellationReason::Resolved)); + assert_eq!(cancel.reason(), Some(CancellationReason::Resolved)); } #[tokio::test] - async fn test_cancel_deadline_fires_any() { + async fn test_cancel_deadline() { let cancel = PayloadJobCancellation::new(); cancel.cancel_deadline(); assert!(cancel.is_cancelled()); assert!(!cancel.is_new_fcu()); assert!(!cancel.is_resolved()); - assert!(matches!(cancel.reason(), CancellationReason::Deadline)); + assert_eq!(cancel.reason(), Some(CancellationReason::Deadline)); } #[tokio::test] - async fn test_any_awaitable() { + async fn test_awaitable() { let cancel = PayloadJobCancellation::new(); - let any = cancel.any_token(); + let token = cancel.token(); tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(10)).await; cancel.cancel_resolved(); }); - timeout(Duration::from_millis(100), any.cancelled()) + timeout(Duration::from_millis(100), token.cancelled()) .await - .expect("any should fire when resolved fires"); + .expect("token should fire when resolved fires"); } #[tokio::test] - async fn test_child_token_cancelled_by_any() { + async fn test_child_token_cancelled() { let cancel = PayloadJobCancellation::new(); let child = cancel.child_token(); assert!(!child.is_cancelled()); @@ -204,8 +191,16 @@ mod tests { } #[tokio::test] - async fn test_reason_complete_when_not_cancelled() { + async fn test_reason_none_when_not_cancelled() { let cancel = PayloadJobCancellation::new(); - assert!(matches!(cancel.reason(), CancellationReason::Complete)); + assert_eq!(cancel.reason(), None); + } + + #[tokio::test] + async fn test_first_reason_wins() { + let cancel = PayloadJobCancellation::new(); + cancel.cancel_resolved(); + cancel.cancel_new_fcu(); + assert_eq!(cancel.reason(), Some(CancellationReason::Resolved)); } } diff --git a/crates/op-rbuilder/src/builder/generator.rs b/crates/op-rbuilder/src/builder/generator.rs index 467819d7..2e6c0241 100644 --- a/crates/op-rbuilder/src/builder/generator.rs +++ b/crates/op-rbuilder/src/builder/generator.rs @@ -567,7 +567,7 @@ mod tests { self.new_event(BlockEvent::Started); best_payload_tx.send_replace(Some(MockPayload(1))); - args.cancel.any_token().cancelled().await; + args.cancel.cancelled().await; self.new_event(BlockEvent::Cancelled); Ok(()) } diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 43563e8d..89e116af 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -480,10 +480,8 @@ where let BuildArguments { cached_reads, config, - cancel: cancellation, + cancel: payload_cancel, } = args; - // Use `any` as the general cancellation token - let block_cancel = cancellation.any_token(); // The build_payload span is created and instrumented in try_build() using // tracing::Instrument, which safely manages it across async .await points. @@ -497,7 +495,7 @@ where let enable_incremental_state_root = self.config.flashblocks_config.enable_incremental_state_root; let ctx = self - .get_op_payload_builder_ctx(config.clone(), block_cancel.clone()) + .get_op_payload_builder_ctx(config.clone(), payload_cancel.token()) .map_err(|e| PayloadBuilderError::Other(e.into()))?; // Initialize flashblocks state for this block @@ -709,7 +707,7 @@ where ..fb_state }; - let fb_cancel = block_cancel.child_token(); + let fb_cancel = payload_cancel.child_token(); let mut ctx = self .get_op_payload_builder_ctx(config, fb_cancel.clone()) .map_err(|e| PayloadBuilderError::Other(e.into()))?; @@ -720,7 +718,7 @@ where .flashblock_timer .instrument(flashblock_scheduler.run( tx, - block_cancel.clone(), + payload_cancel.token(), fb_cancel, fb_payload.payload_id, )), @@ -738,13 +736,8 @@ where let new_fb_cancel = tokio::select! { // ensures cancellation is checked before trigger. biased; - _ = cancellation.resolved_cancelled() => { - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); - self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); - return Ok(()); - } - _ = cancellation.new_fcu_cancelled() => { - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + _ = payload_cancel.cancelled() => { + Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); return Ok(()); } @@ -752,7 +745,7 @@ where Some(t) => t, None => { // Channel closed — scheduler exhausted or canceled - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); return Ok(()); } @@ -788,20 +781,18 @@ where // is dropped (the thread finishes but the oneshot result is discarded). let build_output = tokio::select! { biased; - // Suppressed flashblock: we received getResolve during flashblock building - _ = cancellation.resolved_cancelled() => { - self.metrics.flashblock_publish_suppressed_total.increment(1); - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); - return Ok(()); - } - _ = cancellation.new_fcu_cancelled() => { - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + _ = payload_cancel.cancelled() => { + if payload_cancel.is_resolved() { + // Suppressed flashblock: we received getResolve during flashblock building + self.metrics.flashblock_publish_suppressed_total.increment(1); + } + Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); return Ok(()); } result = self.run_blocking_task({ let builder = self.clone(); let ctx = ctx; - let block_cancel = block_cancel.clone(); + let block_cancel = payload_cancel.token(); let info = info; let cache = cache; let transition = transition; @@ -875,11 +866,11 @@ where // Phase 3: Publish // no .await between check and publish (structural guarantee). // If resolved or new_fcu fired during the build, skip publishing. - if cancellation.is_resolved() || cancellation.is_new_fcu() { - if cancellation.is_resolved() { + if payload_cancel.is_resolved() || payload_cancel.is_new_fcu() { + if payload_cancel.is_resolved() { ctx.metrics.flashblock_publish_suppressed_total.increment(1); } - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); return Ok(()); } @@ -890,7 +881,7 @@ where next_flashblock_state } Ok(None) => { - Self::record_cancellation_reason(&self.metrics, &cancellation, &span); + Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); self.record_flashblocks_metrics( &ctx, &fb_state, @@ -1181,25 +1172,28 @@ where cancellation: &super::cancellation::PayloadJobCancellation, span: &tracing::Span, ) { - let reason = cancellation.reason(); - match reason { - super::cancellation::CancellationReason::Resolved => { - metrics.payload_job_cancellation_resolved.increment(1) + let reason_str = match cancellation.reason() { + Some(super::cancellation::CancellationReason::Resolved) => { + metrics.payload_job_cancellation_resolved.increment(1); + "resolved" } - super::cancellation::CancellationReason::NewFcu => { - metrics.payload_job_cancellation_new_fcu.increment(1) + Some(super::cancellation::CancellationReason::NewFcu) => { + metrics.payload_job_cancellation_new_fcu.increment(1); + "new_fcu" } - super::cancellation::CancellationReason::Deadline => { - metrics.payload_job_cancellation_deadline.increment(1) + Some(super::cancellation::CancellationReason::Deadline) => { + metrics.payload_job_cancellation_deadline.increment(1); + "deadline" } - super::cancellation::CancellationReason::Complete => { - metrics.payload_job_cancellation_complete.increment(1) + None => { + metrics.payload_job_cancellation_complete.increment(1); + "complete" } - } - span.record("cancellation_reason", reason.as_str()); + }; + span.record("cancellation_reason", reason_str); info!( target: "payload_builder", - cancellation_reason = reason.as_str(), + cancellation_reason = reason_str, "Payload job cancelled" ); } From fa5744875b0ed27e96e38f663ed36e1dfd673759 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:04:51 +0900 Subject: [PATCH 5/7] build_fallback_block function --- crates/op-rbuilder/src/builder/payload.rs | 132 ++++++++++++---------- 1 file changed, 72 insertions(+), 60 deletions(-) diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 89e116af..484a9a0b 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -43,7 +43,10 @@ use reth_provider::{ HashedPostStateProvider, ProviderError, StateRootProvider, StorageRootProvider, }; use reth_revm::{ - State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, + State, + cached::CachedReads, + database::StateProviderDatabase, + db::{CacheState, TransitionState, states::bundle_state::BundleRetention}, }; use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, TrieInput, updates::TrieUpdates}; @@ -51,7 +54,7 @@ use revm::Database; use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant}; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, metadata::Level, span, warn}; +use tracing::{debug, error, info, info_span, metadata::Level, span, warn}; /// Converts a reth OpReceipt to an op-alloy OpReceipt /// TODO: remove this once reth updates to use the op-alloy defined type as well. @@ -513,7 +516,7 @@ where let fallback_span = if span.is_none() { tracing::Span::none() } else { - span!(parent: &span, Level::INFO, "build_fallback") + info_span!(parent: &span, "build_fallback") }; let FallbackBuildOutput { ctx, @@ -526,64 +529,10 @@ where } = tracing::Instrument::instrument( self.run_blocking_task({ let builder = self.clone(); - let ctx = ctx; - let mut fb_state = fb_state; move || { - let state_provider = builder.client.state_by_block_hash(ctx.parent().hash())?; - let db = StateProviderDatabase::new(&state_provider); - - // 1. execute the pre steps and seal an early block with that - let sequencer_tx_start_time = Instant::now(); - let mut cached_reads = cached_reads; - let mut state = State::builder() - .with_database(cached_reads.as_db_mut(db)) - .with_bundle_update() - .build(); - - let mut info = execute_pre_steps(&mut state, &ctx)?; - let sequencer_tx_time = sequencer_tx_start_time.elapsed(); - ctx.metrics.sequencer_tx_duration.record(sequencer_tx_time); - ctx.metrics.sequencer_tx_gauge.set(sequencer_tx_time); - - // We add first builder tx right after deposits - if !ctx.attributes().no_tx_pool - && let Err(e) = builder.builder_tx.add_builder_txs( - &state_provider, - &mut info, - &ctx, - &mut state, - false, - fb_state.is_first_flashblock(), - fb_state.is_last_flashblock(), - ) - { - error!( - target: "payload_builder", - "Error adding builder txs to fallback block: {}", - e - ); - }; - - let (payload, fb_payload) = build_block( - &mut state, - &ctx, - Some(&mut fb_state), - &mut info, - !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync - )?; - - // we can safely take from state as we drop it at the end of the scope - let cache = std::mem::take(&mut state.cache); - let transition = state.transition_state.take(); - Ok(FallbackBuildOutput { - ctx, - info, - payload, - fb_payload, - cache, - transition, - fb_state, - }) + builder + .build_fallback_block(ctx, fb_state, cached_reads, disable_state_root) + .map_err(|e| PayloadBuilderError::Other(e.into())) } }), fallback_span, @@ -910,6 +859,69 @@ where } } + /// Execute the pre-steps and seal an early fallback block + fn build_fallback_block( + &self, + ctx: OpPayloadBuilderCtx, + mut fb_state: FlashblocksState, + mut cached_reads: CachedReads, + disable_state_root: bool, + ) -> eyre::Result>> { + let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?; + let db = StateProviderDatabase::new(&state_provider); + + let sequencer_tx_start_time = Instant::now(); + let mut state = State::builder() + .with_database(cached_reads.as_db_mut(db)) + .with_bundle_update() + .build(); + + let mut info = execute_pre_steps(&mut state, &ctx)?; + let sequencer_tx_time = sequencer_tx_start_time.elapsed(); + ctx.metrics.sequencer_tx_duration.record(sequencer_tx_time); + ctx.metrics.sequencer_tx_gauge.set(sequencer_tx_time); + + // We add first builder tx right after deposits + if !ctx.attributes().no_tx_pool + && let Err(e) = self.builder_tx.add_builder_txs( + &state_provider, + &mut info, + &ctx, + &mut state, + false, + fb_state.is_first_flashblock(), + fb_state.is_last_flashblock(), + ) + { + error!( + target: "payload_builder", + "Error adding builder txs to fallback block: {}", + e + ); + }; + + let (payload, fb_payload) = build_block( + &mut state, + &ctx, + Some(&mut fb_state), + &mut info, + !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync + )?; + + // we can safely take from state as we drop it at the end of the scope + let cache = std::mem::take(&mut state.cache); + let transition = state.transition_state.take(); + Ok(FallbackBuildOutput { + ctx, + info, + payload, + fb_payload, + cache, + transition, + fb_state, + }) + } + #[expect(clippy::too_many_arguments)] fn build_next_flashblock< 'a, From 9d150160902e8a75531e50d7f3be5576a22ceb52 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:12:40 +0900 Subject: [PATCH 6/7] TaskSpawnerExt::run_blocking_task --- crates/op-rbuilder/src/builder/payload.rs | 22 +++------------- crates/op-rbuilder/src/lib.rs | 1 + crates/op-rbuilder/src/task_ext.rs | 32 +++++++++++++++++++++++ 3 files changed, 37 insertions(+), 18 deletions(-) create mode 100644 crates/op-rbuilder/src/task_ext.rs diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 484a9a0b..c63256f8 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -12,6 +12,7 @@ use crate::{ gas_limiter::AddressGasLimiter, metrics::OpRBuilderMetrics, primitives::reth::ExecutionInfo, + task_ext::TaskSpawnerExt, tokio_metrics::FlashblocksTaskMetrics, traits::{ClientBounds, PoolBounds}, }; @@ -52,7 +53,7 @@ use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, TrieInput, updates::TrieUpdates}; use revm::Database; use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant}; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, metadata::Level, span, warn}; @@ -381,21 +382,6 @@ where BuilderTx: BuilderTransactions + Send + Sync + 'static, Tasks: TaskSpawner + Clone + Send + Sync + 'static, { - /// Helper to spawn a blocking task that returns T in a oneshot channel - async fn run_blocking_task(&self, task: F) -> Result - where - T: Send + 'static, - F: FnOnce() -> Result + Send + 'static, - { - let (tx, rx) = oneshot::channel(); - self.executor.spawn_blocking_task(Box::pin(async move { - let _ = tx.send(task()); - })); - - rx.await - .map_err(|_| PayloadBuilderError::Other("blocking task dropped".into()))? - } - fn get_op_payload_builder_ctx( &self, config: reth_basic_payload_builder::PayloadConfig< @@ -527,7 +513,7 @@ where mut transition, fb_state: returned_fb_state, } = tracing::Instrument::instrument( - self.run_blocking_task({ + self.executor.run_blocking_task({ let builder = self.clone(); move || { builder @@ -738,7 +724,7 @@ where Self::record_cancellation_reason(&self.metrics, &payload_cancel, &span); return Ok(()); } - result = self.run_blocking_task({ + result = self.executor.run_blocking_task({ let builder = self.clone(); let ctx = ctx; let block_cancel = payload_cancel.token(); diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index e39b3546..2639a5c1 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -8,6 +8,7 @@ pub mod metrics; mod monitor_tx_pool; pub mod primitives; pub mod revert_protection; +pub(crate) mod task_ext; pub mod tokio_metrics; pub mod traits; pub mod tx; diff --git a/crates/op-rbuilder/src/task_ext.rs b/crates/op-rbuilder/src/task_ext.rs new file mode 100644 index 00000000..0f16efcc --- /dev/null +++ b/crates/op-rbuilder/src/task_ext.rs @@ -0,0 +1,32 @@ +use reth::tasks::TaskSpawner; +use reth_node_api::PayloadBuilderError; +use tokio::sync::oneshot; + +/// Extension trait for [`TaskSpawner`] that adds a blocking task helper +/// returning the result via a oneshot channel. +pub(crate) trait TaskSpawnerExt: TaskSpawner { + /// Spawns a blocking task and awaits its result. + /// + /// The closure runs on a blocking thread. The result is sent back via a + /// oneshot channel so the caller can `.await` it from an async context. + fn run_blocking_task( + &self, + task: F, + ) -> impl std::future::Future> + where + T: Send + 'static, + F: FnOnce() -> Result + Send + 'static, + { + let (tx, rx) = oneshot::channel(); + self.spawn_blocking_task(Box::pin(async move { + let _ = tx.send(task()); + })); + + async { + rx.await + .map_err(|_| PayloadBuilderError::Other("blocking task dropped".into()))? + } + } +} + +impl TaskSpawnerExt for T {} From f236ac4d5da5fdbfd8cd7f90505ab455bbfbc0f6 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:49:46 +0900 Subject: [PATCH 7/7] ash suggestions --- crates/op-rbuilder/src/builder/context.rs | 2 +- crates/op-rbuilder/src/builder/generator.rs | 3 ++- crates/op-rbuilder/src/builder/payload.rs | 4 ++-- crates/op-rbuilder/src/builder/service.rs | 7 +++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/op-rbuilder/src/builder/context.rs b/crates/op-rbuilder/src/builder/context.rs index b1167f3f..bb56f541 100644 --- a/crates/op-rbuilder/src/builder/context.rs +++ b/crates/op-rbuilder/src/builder/context.rs @@ -401,7 +401,7 @@ impl OpPayloadBuilderCtx { /// /// Returns `Ok(Some(())` if the job was cancelled. #[expect(clippy::too_many_arguments)] - #[tracing::instrument(level = "info", name = "execute_pool_txs", skip_all)] + #[tracing::instrument(level = "info", skip_all)] pub(super) fn execute_best_transactions( &self, info: &mut ExecutionInfo, diff --git a/crates/op-rbuilder/src/builder/generator.rs b/crates/op-rbuilder/src/builder/generator.rs index 2e6c0241..a253b17e 100644 --- a/crates/op-rbuilder/src/builder/generator.rs +++ b/crates/op-rbuilder/src/builder/generator.rs @@ -331,8 +331,9 @@ where cancel: cancellation, }; + let payload_id = args.config.attributes.payload_id(); if let Err(e) = builder.try_build(args, watch_tx).await { - tracing::error!("build task failed: {:?}", e); + tracing::error!(id = %payload_id, "build task failed: {:?}", e); } })); } diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index c63256f8..f397493a 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -280,7 +280,7 @@ pub(super) struct OpPayloadBuilderInner { built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. - ws_pub: Arc, + ws_pub: WebSocketPublisher, /// System configuration for the builder config: BuilderConfig, /// The metrics for the builder @@ -321,7 +321,7 @@ impl OpPayloadBuilder, built_payload_tx: mpsc::Sender, - ws_pub: Arc, + ws_pub: WebSocketPublisher, metrics: Arc, task_metrics: Arc, executor: Tasks, diff --git a/crates/op-rbuilder/src/builder/service.rs b/crates/op-rbuilder/src/builder/service.rs index b1b4fafe..50b922b8 100644 --- a/crates/op-rbuilder/src/builder/service.rs +++ b/crates/op-rbuilder/src/builder/service.rs @@ -109,14 +109,13 @@ impl FlashblocksServiceBuilder { let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); - let ws_pub: Arc = WebSocketPublisher::new( + let ws_pub = WebSocketPublisher::new( flashblocks_config.ws_addr, metrics.clone(), &task_metrics.websocket_publisher, flashblocks_config.ws_subscriber_limit, ) - .wrap_err("failed to create ws publisher")? - .into(); + .wrap_err("failed to create ws publisher")?; let payload_builder = OpPayloadBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), pool, @@ -125,7 +124,7 @@ impl FlashblocksServiceBuilder { builder_tx, built_fb_payload_tx, built_payload_tx, - ws_pub.clone(), + ws_pub, metrics.clone(), task_metrics.clone(), ctx.task_executor().clone(),