From 746ae6c769b46f125855a508def812303b97b1a8 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 15:56:41 +0800 Subject: [PATCH 1/4] feat: support multiple flashblock websockets as fallback --- rust/op-reth/crates/flashblocks/src/lib.rs | 2 +- rust/op-reth/crates/flashblocks/src/ws/mod.rs | 3 + .../crates/flashblocks/src/ws/multi.rs | 356 ++++++++++++++++++ rust/op-reth/crates/node/src/args.rs | 15 +- rust/op-reth/crates/node/src/node.rs | 22 +- rust/op-reth/crates/rpc/src/eth/mod.rs | 33 +- 6 files changed, 398 insertions(+), 33 deletions(-) create mode 100644 rust/op-reth/crates/flashblocks/src/ws/multi.rs diff --git a/rust/op-reth/crates/flashblocks/src/lib.rs b/rust/op-reth/crates/flashblocks/src/lib.rs index e8118bfce525b..c2fb88e885fd0 100644 --- a/rust/op-reth/crates/flashblocks/src/lib.rs +++ b/rust/op-reth/crates/flashblocks/src/lib.rs @@ -50,7 +50,7 @@ pub use tx_cache::TransactionCache; mod test_utils; mod ws; -pub use ws::{FlashBlockDecoder, WsConnect, WsFlashBlockStream}; +pub use ws::{FlashBlockDecoder, MultiSourceFlashBlockStream, WsConnect, WsFlashBlockStream}; /// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s. /// diff --git a/rust/op-reth/crates/flashblocks/src/ws/mod.rs b/rust/op-reth/crates/flashblocks/src/ws/mod.rs index 651d83c916b1a..dc4fff32d1023 100644 --- a/rust/op-reth/crates/flashblocks/src/ws/mod.rs +++ b/rust/op-reth/crates/flashblocks/src/ws/mod.rs @@ -3,4 +3,7 @@ pub use stream::{WsConnect, WsFlashBlockStream}; mod decoding; pub use decoding::FlashBlockDecoder; +mod multi; +pub use multi::MultiSourceFlashBlockStream; + mod stream; diff --git a/rust/op-reth/crates/flashblocks/src/ws/multi.rs b/rust/op-reth/crates/flashblocks/src/ws/multi.rs new file mode 100644 index 0000000000000..3cb31d31acbb1 --- /dev/null +++ b/rust/op-reth/crates/flashblocks/src/ws/multi.rs @@ -0,0 +1,356 @@ +use crate::FlashBlock; +use alloy_primitives::B256; +use alloy_rpc_types_engine::PayloadId; +use futures_util::Stream; +use metrics::{Counter, Gauge}; +use reth_metrics::Metrics; +use std::{ + collections::{hash_map::Entry, HashMap}, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio::time::Sleep; +use tracing::warn; + +/// Backoff duration applied per-source when a stream error occurs. +const PER_SOURCE_BACKOFF: Duration = Duration::from_millis(500); + +/// Dedup key: uniquely identifies a flashblock within a block. +type DedupKey = (PayloadId, u64); + +/// Per-source state tracker. +#[derive(Debug)] +struct SourceState { + index: usize, + terminated: bool, + backoff: Option>>, +} + +/// A multi-source flashblock stream that wraps multiple inner streams. +/// +/// Deduplicates flashblocks by `(payload_id, index)`. The first-arriving +/// flashblock for each key wins. Subsequent duplicates are cross-validated: +/// +/// Each source has independent error backoff. When a source errors, it backs +/// off for [`PER_SOURCE_BACKOFF`] before being polled again. Other sources +/// continue serving flashblocks during this time. +#[derive(Debug)] +pub struct MultiSourceFlashBlockStream { + sources: Vec<(S, SourceState)>, + seen: HashMap, + current_payload_id: Option, + poll_offset: usize, + metrics: MultiSourceMetrics, +} + +impl MultiSourceFlashBlockStream { + /// Creates a new multi-source stream wrapping the given inner streams. + pub fn new(streams: Vec) -> Self { + let len = streams.len(); + let sources = streams + .into_iter() + .enumerate() + .map(|(i, s)| (s, SourceState { index: i, terminated: false, backoff: None })) + .collect(); + let metrics = MultiSourceMetrics::default(); + metrics.total_sources.set(len as f64); + metrics.active_sources.set(len as f64); + Self { sources, seen: HashMap::new(), current_payload_id: None, poll_offset: 0, metrics } + } + + /// Updates the active sources gauge metric. + fn update_active_sources_metric(&self) { + let active = self.sources.iter().filter(|(_, s)| !s.terminated).count(); + self.metrics.active_sources.set(active as f64); + } +} + +impl Stream for MultiSourceFlashBlockStream +where + S: Stream> + Unpin, +{ + type Item = eyre::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let num_sources = this.sources.len(); + + if num_sources == 0 { + return Poll::Ready(None); + } + + // Rotate start index + let start = this.poll_offset; + this.poll_offset = (this.poll_offset + 1) % num_sources; + + for i in 0..num_sources { + let idx = (start + i) % num_sources; + let (stream, state) = &mut this.sources[idx]; + + if state.terminated { + continue; + } + + if let Some(backoff) = &mut state.backoff { + match backoff.as_mut().poll(cx) { + Poll::Ready(()) => { + state.backoff = None; + } + Poll::Pending => { + continue; + } + } + } + + match Pin::new(stream).poll_next(cx) { + Poll::Ready(Some(Ok(flashblock))) => { + // Detect new block: index 0 with a new payload_id resets dedup state + if flashblock.index == 0 && + this.current_payload_id.as_ref() != Some(&flashblock.payload_id) + { + this.seen.clear(); + this.current_payload_id = Some(flashblock.payload_id); + } + + let key = (flashblock.payload_id, flashblock.index); + + match this.seen.entry(key) { + Entry::Vacant(entry) => { + // First arrival + entry.insert(flashblock.diff.block_hash); + return Poll::Ready(Some(Ok(flashblock))); + } + Entry::Occupied(entry) => { + // Duplicate - cross-validate and skip + this.metrics.deduplicated_total.increment(1); + let existing_hash = *entry.get(); + if existing_hash != flashblock.diff.block_hash { + warn!( + target: "flashblocks", + source = state.index, + payload_id = %flashblock.payload_id, + index = flashblock.index, + expected_hash = %existing_hash, + actual_hash = %flashblock.diff.block_hash, + "Cross-validation mismatch: block hash differs between sources" + ); + this.metrics.mismatch_total.increment(1); + } + } + } + } + Poll::Ready(Some(Err(err))) => { + warn!( + target: "flashblocks", + source = state.index, + %err, + "Source error, backing off" + ); + state.backoff = Some(Box::pin(tokio::time::sleep(PER_SOURCE_BACKOFF))); + // Register waker on the new backoff timer + if let Some(backoff) = &mut state.backoff { + let _ = backoff.as_mut().poll(cx); + } + } + Poll::Ready(None) => { + state.terminated = true; + warn!(target: "flashblocks", source = state.index, "Source terminated"); + this.update_active_sources_metric(); + } + Poll::Pending => {} + } + } + + if this.sources.iter().all(|(_, s)| s.terminated) { + warn!(target: "flashblocks", "All sources terminated, returning None"); + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +/// Metrics for [`MultiSourceFlashBlockStream`]. +#[derive(Metrics)] +#[metrics(scope = "flashblock_multi_source")] +struct MultiSourceMetrics { + /// Total flashblocks dropped as duplicates. + deduplicated_total: Counter, + /// Cross-validation hash mismatches detected. + mismatch_total: Counter, + /// Currently active (non-terminated) sources. + active_sources: Gauge, + /// Total configured sources. + total_sources: Gauge, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::TestFlashBlockFactory; + use futures_util::{stream, StreamExt}; + + /// A concrete stream type used in tests to allow mixing ok/error items in the same `Vec`. + type TestStream = stream::Iter>>; + + fn ok_stream(items: Vec) -> TestStream { + stream::iter(items.into_iter().map(Ok).collect::>()) + } + + fn err_then_done(msg: &str) -> TestStream { + stream::iter(vec![Err(eyre::eyre!("{msg}"))]) + } + + fn mixed_stream(items: Vec>) -> TestStream { + stream::iter(items) + } + + #[tokio::test] + async fn test_single_source_passthrough() { + let factory = TestFlashBlockFactory::new(); + let fb0 = factory.flashblock_at(0).build(); + let fb1 = factory.flashblock_after(&fb0).build(); + let fb2 = factory.flashblock_after(&fb1).build(); + + let mut stream = MultiSourceFlashBlockStream::new(vec![ok_stream(vec![ + fb0.clone(), + fb1.clone(), + fb2.clone(), + ])]); + + let r0 = stream.next().await.unwrap().unwrap(); + let r1 = stream.next().await.unwrap().unwrap(); + let r2 = stream.next().await.unwrap().unwrap(); + + assert_eq!(r0, fb0); + assert_eq!(r1, fb1); + assert_eq!(r2, fb2); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_first_arrival_dedup() { + let factory = TestFlashBlockFactory::new(); + let fb0 = factory.flashblock_at(0).build(); + let fb1 = factory.flashblock_after(&fb0).build(); + + // Both sources have the same flashblocks. + let source_a = ok_stream(vec![fb0.clone(), fb1.clone()]); + let source_b = ok_stream(vec![fb0.clone(), fb1.clone()]); + + let stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + let results: Vec<_> = stream.map(Result::unwrap).collect().await; + + // Only 2 unique flashblocks emitted, not 4. + assert_eq!(results.len(), 2); + assert_eq!(results[0], fb0); + assert_eq!(results[1], fb1); + } + + #[tokio::test] + async fn test_cross_validation_mismatch() { + let factory = TestFlashBlockFactory::new(); + let fb_a = factory.flashblock_at(0).block_hash(B256::with_last_byte(1)).build(); + let fb_b = factory + .flashblock_at(0) + .payload_id(fb_a.payload_id) + .block_hash(B256::with_last_byte(2)) + .build(); + + let source_a = ok_stream(vec![fb_a.clone()]); + let source_b = ok_stream(vec![fb_b]); + + let mut stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + + // First arrival wins. + let result = stream.next().await.unwrap().unwrap(); + assert_eq!(result.diff.block_hash, fb_a.diff.block_hash); + + // Duplicate with mismatched hash is dropped (mismatch metric incremented). + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_per_source_error_backoff() { + let factory = TestFlashBlockFactory::new(); + let fb0 = factory.flashblock_at(0).build(); + + // Source A errors, source B delivers fb0. + let source_a = err_then_done("connection lost"); + let source_b = ok_stream(vec![fb0.clone()]); + + let mut stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + + // Source B's flashblock should come through despite source A's error. + let result = stream.next().await.unwrap().unwrap(); + assert_eq!(result, fb0); + } + + #[tokio::test] + async fn test_error_mixed_with_ok() { + let factory = TestFlashBlockFactory::new(); + let fb0 = factory.flashblock_at(0).build(); + let fb1 = factory.flashblock_after(&fb0).build(); + + // Source A: error then fb1. Source B: fb0. + let source_a = mixed_stream(vec![Err(eyre::eyre!("oops")), Ok(fb1.clone())]); + let source_b = ok_stream(vec![fb0.clone()]); + + let mut stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + + // fb0 from source B arrives (source A is backing off). + let result = stream.next().await.unwrap().unwrap(); + assert_eq!(result, fb0); + } + + #[tokio::test] + async fn test_source_termination_liveness() { + let factory = TestFlashBlockFactory::new(); + let fb0 = factory.flashblock_at(0).build(); + let fb1 = factory.flashblock_after(&fb0).build(); + + // Source A has fb0 then ends, source B has fb0 (dup) then fb1. + let source_a = ok_stream(vec![fb0.clone()]); + let source_b = ok_stream(vec![fb0.clone(), fb1.clone()]); + + let stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + let results: Vec<_> = stream.map(Result::unwrap).collect().await; + + assert_eq!(results.len(), 2); + assert_eq!(results[0], fb0); + assert_eq!(results[1], fb1); + } + + #[tokio::test] + async fn test_new_block_resets_dedup() { + let factory = TestFlashBlockFactory::new(); + + // Block 1 + let b1_fb0 = factory.flashblock_at(0).build(); + // Block 2 (new payload_id, index 0) + let b2_fb0 = factory.flashblock_for_next_block(&b1_fb0).build(); + + // Source A: block1 fb0, block2 fb0 + // Source B: block1 fb0 (dup), block2 fb0 (dup) + let source_a = ok_stream(vec![b1_fb0.clone(), b2_fb0.clone()]); + let source_b = ok_stream(vec![b1_fb0.clone(), b2_fb0.clone()]); + + let stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + let results: Vec<_> = stream.map(Result::unwrap).collect().await; + + // 2 unique flashblocks: one per block. + assert_eq!(results.len(), 2); + assert_eq!(results[0].payload_id, b1_fb0.payload_id); + assert_eq!(results[1].payload_id, b2_fb0.payload_id); + } + + #[tokio::test] + async fn test_all_done_returns_none() { + let source_a: TestStream = stream::iter(vec![]); + let source_b: TestStream = stream::iter(vec![]); + + let mut stream = MultiSourceFlashBlockStream::new(vec![source_a, source_b]); + assert!(stream.next().await.is_none()); + } +} diff --git a/rust/op-reth/crates/node/src/args.rs b/rust/op-reth/crates/node/src/args.rs index 4f6e1cc84b58b..3baa042d702d1 100644 --- a/rust/op-reth/crates/node/src/args.rs +++ b/rust/op-reth/crates/node/src/args.rs @@ -70,19 +70,22 @@ pub struct RollupArgs { #[arg(long, default_value_t = 1_000_000)] pub min_suggested_priority_fee: u64, - /// A URL pointing to a secure websocket subscription that streams out flashblocks. + /// URL(s) pointing to secure websocket subscriptions that stream out flashblocks. /// /// If given, the flashblocks are received to build pending block. All request with "pending" /// block tag will use the pending state based on flashblocks. - #[arg(long, alias = "websocket-url")] - pub flashblocks_url: Option, + /// + /// Multiple URLs can be specified for redundancy. + /// Pass multiple comma-separated values for multiple flashblock websocket sources. + #[arg(long = "flashblocks-url", alias = "websocket-url", value_delimiter = ',')] + pub flashblocks_urls: Vec, /// Enable flashblock consensus client to drive the chain forward /// /// When enabled, the flashblock consensus client will process flashblock sequences and submit /// them to the engine API to advance the chain. - /// Requires `flashblocks_url` to be set. - #[arg(long, default_value_t = false, requires = "flashblocks_url")] + /// Requires at least one `flashblocks-url` to be set. + #[arg(long, default_value_t = false, requires = "flashblocks_urls")] pub flashblock_consensus: bool, /// If true, initialize external-proofs exex to save and serve trie nodes to provide proofs @@ -159,7 +162,7 @@ impl Default for RollupArgs { sequencer_headers: Vec::new(), historical_rpc: None, min_suggested_priority_fee: 1_000_000, - flashblocks_url: None, + flashblocks_urls: Vec::new(), flashblock_consensus: false, proofs_history: false, proofs_history_storage_path: None, diff --git a/rust/op-reth/crates/node/src/node.rs b/rust/op-reth/crates/node/src/node.rs index cbcd898f40931..3c1f41a21f19d 100644 --- a/rust/op-reth/crates/node/src/node.rs +++ b/rust/op-reth/crates/node/src/node.rs @@ -193,7 +193,7 @@ impl OpNode { .with_enable_tx_conditional(self.args.enable_tx_conditional) .with_min_suggested_priority_fee(self.args.min_suggested_priority_fee) .with_historical_rpc(self.args.historical_rpc.clone()) - .with_flashblocks(self.args.flashblocks_url.clone()) + .with_flashblocks(self.args.flashblocks_urls.clone()) .with_flashblock_consensus(self.args.flashblock_consensus) } @@ -699,8 +699,8 @@ pub struct OpAddOnsBuilder { rpc_middleware: RpcMiddleware, /// Optional tokio runtime to use for the RPC server. tokio_runtime: Option, - /// A URL pointing to a secure websocket service that streams out flashblocks. - flashblocks_url: Option, + /// URL(s) pointing to secure websocket services that stream out flashblocks. + flashblocks_urls: Vec, /// Enable flashblock consensus client to drive chain forward. flashblock_consensus: bool, } @@ -718,7 +718,7 @@ impl Default for OpAddOnsBuilder { _nt: PhantomData, rpc_middleware: Identity::new(), tokio_runtime: None, - flashblocks_url: None, + flashblocks_urls: Vec::new(), flashblock_consensus: false, } } @@ -787,7 +787,7 @@ impl OpAddOnsBuilder { min_suggested_priority_fee, tokio_runtime, _nt, - flashblocks_url, + flashblocks_urls, flashblock_consensus, .. } = self; @@ -802,14 +802,14 @@ impl OpAddOnsBuilder { _nt, rpc_middleware, tokio_runtime, - flashblocks_url, + flashblocks_urls, flashblock_consensus, } } - /// With a URL pointing to a flashblocks secure websocket subscription. - pub fn with_flashblocks(mut self, flashblocks_url: Option) -> Self { - self.flashblocks_url = flashblocks_url; + /// With URL(s) to flashblocks secure websocket subscription(s). + pub fn with_flashblocks(mut self, flashblocks_urls: Vec) -> Self { + self.flashblocks_urls = flashblocks_urls; self } @@ -842,7 +842,7 @@ impl OpAddOnsBuilder { historical_rpc, rpc_middleware, tokio_runtime, - flashblocks_url, + flashblocks_urls, flashblock_consensus, .. } = self; @@ -853,7 +853,7 @@ impl OpAddOnsBuilder { .with_sequencer(sequencer_url.clone()) .with_sequencer_headers(sequencer_headers.clone()) .with_min_suggested_priority_fee(min_suggested_priority_fee) - .with_flashblocks(flashblocks_url) + .with_flashblocks(flashblocks_urls) .with_flashblock_consensus(flashblock_consensus), PVB::default(), EB::default(), diff --git a/rust/op-reth/crates/rpc/src/eth/mod.rs b/rust/op-reth/crates/rpc/src/eth/mod.rs index d53b11d8bf0d9..03dc549481c1a 100644 --- a/rust/op-reth/crates/rpc/src/eth/mod.rs +++ b/rust/op-reth/crates/rpc/src/eth/mod.rs @@ -29,7 +29,8 @@ use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblockCachedReceipt, - FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, + FlashblocksListeners, MultiSourceFlashBlockStream, PendingBlockRx, PendingFlashBlock, + WsFlashBlockStream, }; use reth_primitives_traits::NodePrimitives; use reth_rpc::eth::core::EthApiInner; @@ -446,15 +447,15 @@ pub struct OpEthApiBuilder { sequencer_headers: Vec, /// Minimum suggested priority fee (tip) min_suggested_priority_fee: u64, - /// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks]. + /// URL(s) pointing to secure websocket connections that stream out [flashblocks]. /// /// [flashblocks]: reth_optimism_flashblocks - flashblocks_url: Option, + flashblocks_urls: Vec, /// Enable flashblock consensus client to drive the chain forward. /// /// When enabled, flashblock sequences are submitted to the engine API via /// `newPayload` and `forkchoiceUpdated` calls, advancing the canonical chain state. - /// Requires `flashblocks_url` to be set. + /// Requires `flashblocks_urls` to be set. flashblock_consensus: bool, /// Marker for network types. _nt: PhantomData, @@ -466,7 +467,7 @@ impl Default for OpEthApiBuilder { sequencer_url: None, sequencer_headers: Vec::new(), min_suggested_priority_fee: 1_000_000, - flashblocks_url: None, + flashblocks_urls: Vec::new(), flashblock_consensus: false, _nt: PhantomData, } @@ -480,7 +481,7 @@ impl OpEthApiBuilder { sequencer_url: None, sequencer_headers: Vec::new(), min_suggested_priority_fee: 1_000_000, - flashblocks_url: None, + flashblocks_urls: Vec::new(), flashblock_consensus: false, _nt: PhantomData, } @@ -504,9 +505,9 @@ impl OpEthApiBuilder { self } - /// With a subscription to flashblocks secure websocket connection. - pub fn with_flashblocks(mut self, flashblocks_url: Option) -> Self { - self.flashblocks_url = flashblocks_url; + /// With subscription(s) to flashblocks secure websocket connection(s). + pub fn with_flashblocks(mut self, flashblocks_urls: Vec) -> Self { + self.flashblocks_urls = flashblocks_urls; self } @@ -548,7 +549,7 @@ where sequencer_url, sequencer_headers, min_suggested_priority_fee, - flashblocks_url, + flashblocks_urls, flashblock_consensus, .. } = self; @@ -566,11 +567,15 @@ where None }; - let flashblocks = if let Some(ws_url) = flashblocks_url { - info!(target: "reth:cli", %ws_url, "Launching flashblocks service"); + let flashblocks = if flashblocks_urls.is_empty() { + None + } else { + info!(target: "reth:cli", ?flashblocks_urls, "Launching flashblocks service"); let (tx, pending_rx) = watch::channel(None); - let stream = WsFlashBlockStream::new(ws_url); + let streams: Vec<_> = + flashblocks_urls.into_iter().map(WsFlashBlockStream::new).collect(); + let stream = MultiSourceFlashBlockStream::new(streams); let service = FlashBlockService::new( stream, ctx.components.evm_config().clone(), @@ -600,8 +605,6 @@ where in_progress_rx, received_flashblocks, )) - } else { - None }; let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner(); From eb9a7938aaaabfe6d1557896a99884711b2da0a5 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 17:55:05 +0800 Subject: [PATCH 2/4] fix: more idiomatic rust --- rust/op-reth/crates/flashblocks/src/ws/multi.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rust/op-reth/crates/flashblocks/src/ws/multi.rs b/rust/op-reth/crates/flashblocks/src/ws/multi.rs index 3cb31d31acbb1..ab04b0816b4c0 100644 --- a/rust/op-reth/crates/flashblocks/src/ws/multi.rs +++ b/rust/op-reth/crates/flashblocks/src/ws/multi.rs @@ -93,14 +93,10 @@ where } if let Some(backoff) = &mut state.backoff { - match backoff.as_mut().poll(cx) { - Poll::Ready(()) => { - state.backoff = None; - } - Poll::Pending => { - continue; - } + if backoff.as_mut().poll(cx).is_pending() { + continue; } + state.backoff = None; } match Pin::new(stream).poll_next(cx) { From 0a2b5c7f954d5b36af2c1a63fd6daeb78848d966 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 18:07:22 +0800 Subject: [PATCH 3/4] clean up --- rust/op-reth/crates/flashblocks/src/ws/multi.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/rust/op-reth/crates/flashblocks/src/ws/multi.rs b/rust/op-reth/crates/flashblocks/src/ws/multi.rs index ab04b0816b4c0..459c128fb665e 100644 --- a/rust/op-reth/crates/flashblocks/src/ws/multi.rs +++ b/rust/op-reth/crates/flashblocks/src/ws/multi.rs @@ -143,11 +143,10 @@ where %err, "Source error, backing off" ); - state.backoff = Some(Box::pin(tokio::time::sleep(PER_SOURCE_BACKOFF))); - // Register waker on the new backoff timer - if let Some(backoff) = &mut state.backoff { - let _ = backoff.as_mut().poll(cx); - } + let mut backoff = Box::pin(tokio::time::sleep(PER_SOURCE_BACKOFF)); + // Register waker so we're notified when the backoff expires + let _ = backoff.as_mut().poll(cx); + state.backoff = Some(backoff); } Poll::Ready(None) => { state.terminated = true; From e9c20bc0c77592945607fb85c8b3b3fde68d33a5 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 18:44:42 +0800 Subject: [PATCH 4/4] chore --- rust/op-reth/crates/flashblocks/src/ws/multi.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/op-reth/crates/flashblocks/src/ws/multi.rs b/rust/op-reth/crates/flashblocks/src/ws/multi.rs index 459c128fb665e..eed1178b8d4f1 100644 --- a/rust/op-reth/crates/flashblocks/src/ws/multi.rs +++ b/rust/op-reth/crates/flashblocks/src/ws/multi.rs @@ -30,7 +30,8 @@ struct SourceState { /// A multi-source flashblock stream that wraps multiple inner streams. /// /// Deduplicates flashblocks by `(payload_id, index)`. The first-arriving -/// flashblock for each key wins. Subsequent duplicates are cross-validated: +/// flashblock for each key wins. Subsequent duplicates are cross-validated +/// by comparing block hashes, with mismatches logged as warnings. /// /// Each source has independent error backoff. When a source errors, it backs /// off for [`PER_SOURCE_BACKOFF`] before being polled again. Other sources @@ -103,7 +104,7 @@ where Poll::Ready(Some(Ok(flashblock))) => { // Detect new block: index 0 with a new payload_id resets dedup state if flashblock.index == 0 && - this.current_payload_id.as_ref() != Some(&flashblock.payload_id) + this.current_payload_id != Some(flashblock.payload_id) { this.seen.clear(); this.current_payload_id = Some(flashblock.payload_id);