diff --git a/chain-signatures/node/src/indexer_eth/mod.rs b/chain-signatures/node/src/indexer_eth/mod.rs index 704743fb..d56aa478 100644 --- a/chain-signatures/node/src/indexer_eth/mod.rs +++ b/chain-signatures/node/src/indexer_eth/mod.rs @@ -8,7 +8,8 @@ use crate::metrics::requests::{record_request_latency, SignRequestStep}; use crate::protocol::{Chain, IndexedSignRequest}; use crate::respond_bidirectional::CompletedTx; use crate::sign_bidirectional::SignStatus; -use crate::stream::{ChainEvent, ChainStream, ExecutionOutcome}; +use crate::stream::{ChainBufferedStream, ChainEvent, ChainIndexer, ChainStream, ExecutionOutcome}; +use async_trait::async_trait; use alloy::eips::BlockNumberOrTag; use alloy::primitives::hex::{self, ToHexExt}; @@ -20,13 +21,11 @@ use k256::{AffinePoint as K256AffinePoint, EncodedPoint, FieldBytes, Scalar}; use mpc_crypto::{kdf::derive_epsilon_eth, ScalarExt as _}; use mpc_primitives::{SignArgs, SignId, Signature as MpcSignature, LATEST_MPC_KEY_VERSION}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; use std::fmt; use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use tokio::time::Duration; pub(crate) static MAX_SECP256K1_SCALAR: LazyLock = LazyLock::new(|| { @@ -42,49 +41,23 @@ pub(crate) static MAX_SECP256K1_SCALAR: LazyLock = LazyLock::new(|| { // This is the maximum number of blocks that Helios can look back to const MAX_CATCHUP_BLOCKS: u64 = 8191; -const MAX_BLOCKS_TO_PROCESS: usize = 10000; +const MAX_LIVE_BLOCK_BUFFER: usize = 16384; -fn blocks_to_process_channel() -> (mpsc::Sender, mpsc::Receiver) { - mpsc::channel(MAX_BLOCKS_TO_PROCESS) -} - -const MAX_INDEXED_REQUESTS: usize = 1024; - -fn indexed_channel() -> ( - mpsc::Sender, - mpsc::Receiver, -) { - mpsc::channel(MAX_INDEXED_REQUESTS) -} - -const MAX_FAILED_BLOCKS: usize = 1024; - -fn failed_blocks_channel() -> ( +fn live_blocks_channel() -> ( mpsc::Sender, mpsc::Receiver, ) { - mpsc::channel(MAX_FAILED_BLOCKS) -} - -const MAX_FINALIZED_BLOCKS: usize = 1024; - -fn finalized_block_channel() -> (mpsc::Sender, mpsc::Receiver) { - mpsc::channel(MAX_FINALIZED_BLOCKS) + mpsc::channel(MAX_LIVE_BLOCK_BUFFER) } type BlockNumber = u64; -pub enum BlockToProcess { - Catchup(BlockNumber), - NewBlock(Box), -} - -#[derive(Clone)] pub struct BlockAndRequests { block_number: u64, block_hash: alloy::primitives::B256, indexed_requests: Vec, respond_logs: Vec, + execution_events: Vec, } impl BlockAndRequests { @@ -93,16 +66,35 @@ impl BlockAndRequests { block_hash: alloy::primitives::B256, indexed_requests: Vec, respond_logs: Vec, + execution_events: Vec, ) -> Self { Self { block_number, block_hash, indexed_requests, respond_logs, + execution_events, } } } +pub struct EthereumBufferedStream { + live_blocks_rx: mpsc::Receiver, +} + +#[async_trait] +impl ChainBufferedStream for EthereumBufferedStream { + type Block = alloy::rpc::types::Block; + + async fn initial(&mut self) -> Option { + self.live_blocks_rx.recv().await + } + + async fn next(&mut self) -> Option { + self.live_blocks_rx.recv().await + } +} + #[derive(Clone)] pub struct EthConfig { /// The ethereum account secret key used to sign eth respond txn. @@ -722,213 +714,145 @@ impl EthereumClient { pub struct EthereumIndexer { eth: EthConfig, backlog: Backlog, - client: EthereumClient, + client: Arc, + events_tx: mpsc::Sender, + contract_address: Address, } impl EthereumIndexer { - pub async fn new(eth: EthConfig, backlog: Backlog) -> anyhow::Result { - let client = EthereumClient::new(eth.clone()).await?; + pub async fn new( + eth: EthConfig, + backlog: Backlog, + events_tx: mpsc::Sender, + ) -> anyhow::Result { + let client = Arc::new(EthereumClient::new(eth.clone()).await?); + let contract_address = format!("0x{}", eth.contract_address); + let contract_address = Address::from_str(&contract_address).map_err(|_| { + anyhow::anyhow!("failed to parse ethereum contract address: {contract_address}") + })?; Ok(Self { eth, backlog, client, + events_tx, + contract_address, }) } - pub async fn run(self, events_tx: mpsc::Sender) { - let backlog = self.backlog; - let eth = self.eth; - let client = Arc::new(self.client); - - tracing::info!("running ethereum indexer"); - let Ok(contract_address) = Address::from_str(&format!("0x{}", eth.contract_address)) else { - tracing::error!("Failed to parse contract address: {}", eth.contract_address); - return; - }; - let (blocks_failed_send, blocks_failed_recv) = failed_blocks_channel(); - - let (requests_indexed_send, requests_indexed_recv) = indexed_channel(); - - let (finalized_block_send, finalized_block_recv) = finalized_block_channel(); - - let (blocks_to_process_send, mut blocks_to_process_recv) = blocks_to_process_channel(); - - let client_clone = Arc::clone(&client); - let finalized_block_send_clone = finalized_block_send.clone(); - let refresh_interval = eth.refresh_finalized_interval; - tokio::spawn(async move { - tracing::info!("Spawned task to refresh the latest finalized block"); - Self::refresh_finalized_block( - client_clone, - finalized_block_send_clone, - refresh_interval, - ) - .await; - }); - - let client_clone = Arc::clone(&client); - let optimistic_requests = eth.optimistic_requests; - tokio::spawn(Self::send_requests_when_final( - client_clone, - requests_indexed_recv, - finalized_block_recv, - events_tx.clone(), - optimistic_requests, - )); - - tokio::spawn(Self::retry_failed_blocks( - Arc::clone(&client), - blocks_failed_recv, - blocks_failed_send.clone(), - contract_address, - requests_indexed_send.clone(), - backlog.clone(), - events_tx.clone(), - )); - - let last_processed_block = backlog.processed_block(Chain::Ethereum).await; - let mut expected_catchup_blocks = 0usize; - let mut processed_catchup_blocks = HashSet::new(); - let mut catchup_completed_emitted = false; - - let blocks_to_process_send_clone = blocks_to_process_send.clone(); - if let Some(last_processed_block) = last_processed_block { - match Self::catchup_end_block_number(Arc::clone(&client)).await { - Some(end_block_number) => { - expected_catchup_blocks = end_block_number - .saturating_sub(last_processed_block) - .saturating_add(1) as usize; - Self::add_catchup_blocks_to_process( - blocks_to_process_send_clone, - last_processed_block, - end_block_number, - ) - .await - } - None => { - tracing::error!("Failed to get catchup end block number"); - } - } - } - - if expected_catchup_blocks == 0 { - if let Err(err) = events_tx.send(ChainEvent::CatchupCompleted).await { - tracing::warn!(?err, "failed to emit ethereum catchup completion event"); - } else { - catchup_completed_emitted = true; - } - } - - tokio::spawn(Self::add_new_block_to_process( - Arc::clone(&client), - blocks_to_process_send.clone(), - )); + async fn buffer_live_blocks( + client: Arc, + live_blocks: mpsc::Sender, + ) { + tracing::info!("buffering ethereum live blocks"); + let mut next_block_number: Option = None; - let mut interval = tokio::time::interval(Duration::from_millis(200)); - let requests_indexed_send_clone = requests_indexed_send.clone(); loop { - let Some(block_to_process) = blocks_to_process_recv.recv().await else { - interval.tick().await; + let Some(latest_block_number) = client.get_latest_block_number().await else { + tokio::time::sleep(Duration::from_millis(500)).await; continue; }; - let (block, is_catchup) = match block_to_process { - BlockToProcess::Catchup(block_number) => { - let block = client - .get_block(alloy::rpc::types::BlockId::Number( - BlockNumberOrTag::Number(block_number), - )) - .await; - if let Some(block) = block { - (block, true) - } else { - tracing::warn!("Block {block_number} not found from Ethereum client"); - continue; - } - } - BlockToProcess::NewBlock(block) => ((*block).clone(), false), - }; - let block_number = block.header.number; - if let Err(err) = Self::process_block( - client.clone(), - block.clone(), - contract_address, - requests_indexed_send_clone.clone(), - backlog.clone(), - events_tx.clone(), - ) - .await - { - tracing::warn!( - "Eth indexer failed to process block number {block_number}: {err:?}" - ); - Self::add_failed_block(blocks_failed_send.clone(), block).await; + + let mut block_number = next_block_number.unwrap_or(latest_block_number); + if block_number > latest_block_number { + tokio::time::sleep(Duration::from_millis(500)).await; continue; } - if block_number % 10 == 0 { - if is_catchup { - tracing::info!("Processed catchup block number {block_number}"); - } else { - tracing::info!("Processed new block number {block_number}"); - } - } - if is_catchup && !catchup_completed_emitted { - processed_catchup_blocks.insert(block_number); - if processed_catchup_blocks.len() >= expected_catchup_blocks { - if let Err(err) = events_tx.send(ChainEvent::CatchupCompleted).await { - tracing::warn!(?err, "failed to emit ethereum catchup completion event"); - } else { - catchup_completed_emitted = true; - } + while block_number <= latest_block_number { + let Some(block) = client + .get_block(alloy::rpc::types::BlockId::Number( + BlockNumberOrTag::Number(block_number), + )) + .await + else { + tracing::warn!(block_number, "ethereum live block not yet available"); + break; + }; + + if let Err(err) = live_blocks.send(block).await { + tracing::warn!(?err, block_number, "failed to buffer ethereum live block"); + return; } + + next_block_number = Some(block_number.saturating_add(1)); + block_number = block_number.saturating_add(1); } - crate::metrics::indexers::LATEST_BLOCK_NUMBER - .with_label_values(&[Chain::Ethereum.as_str(), "indexed"]) - .set(block_number as i64); + tokio::time::sleep(Duration::from_millis(500)).await; } } - async fn add_new_block_to_process( - client: Arc, - blocks_to_process: mpsc::Sender, - ) { - tracing::info!("Adding new blocks to process..."); - let mut current_block = 0; - loop { - let Some(latest_block) = client - .get_block(alloy::rpc::types::BlockId::Number(BlockNumberOrTag::Latest)) - .await - else { - continue; - }; - let block_number = latest_block.header.number; - if block_number <= current_block { - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - if let Err(err) = blocks_to_process - .send(BlockToProcess::NewBlock(Box::new(latest_block))) - .await - { - tracing::warn!("Failed to send new block to process: {err:?}"); - } - current_block = block_number; + fn catchup_start_block_number( + last_processed_block: Option, + anchor_height: BlockNumber, + ) -> BlockNumber { + let requested_start = last_processed_block + .map(|height| height.saturating_add(1)) + .unwrap_or(anchor_height); + + let catchup_end = anchor_height.saturating_sub(1); + let oldest_supported = catchup_end.saturating_sub(MAX_CATCHUP_BLOCKS); + + if requested_start < oldest_supported { + tracing::warn!( + requested_start, + anchor_height, + oldest_supported, + "ethereum catchup start is older than supported range; clamping" + ); + oldest_supported + } else { + requested_start } } - async fn catchup_end_block_number(client: Arc) -> Option { - client.get_latest_block_number().await + + async fn process_height(&self, block_number: u64) -> anyhow::Result<()> { + let Some(block) = self + .client + .get_block(alloy::rpc::types::BlockId::Number( + BlockNumberOrTag::Number(block_number), + )) + .await + else { + anyhow::bail!("ethereum block {block_number} not found"); + }; + + self.process_live_block(block).await + } + + async fn process_live_block(&self, block: alloy::rpc::types::Block) -> anyhow::Result<()> { + let block_number = block.header.number; + + let processed = Self::process_block( + self.client.clone(), + block, + self.contract_address, + self.backlog.clone(), + ) + .await?; + + Self::emit_processed_block( + self.client.clone(), + self.events_tx.clone(), + &self.eth, + processed, + ) + .await?; + + crate::metrics::indexers::LATEST_BLOCK_NUMBER + .with_label_values(&[Chain::Ethereum.as_str(), "indexed"]) + .set(block_number as i64); + + Ok(()) } async fn process_block( client: Arc, block: alloy::rpc::types::Block, contract_address: Address, - requests_indexed: mpsc::Sender, backlog: Backlog, - events_tx: mpsc::Sender, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let block_number = block.header.number; let block_hash = block.header.hash; let block_timestamp = block.header.timestamp; @@ -998,11 +922,6 @@ impl EthereumIndexer { block_receipts.clone(), ) .await?; - for ev in exec_events { - if let Err(err) = events_tx.send(ev).await { - tracing::error!(?err, "failed to emit ExecutionConfirmed event"); - } - } for _request in &sign_requests { record_request_latency( @@ -1015,17 +934,13 @@ impl EthereumIndexer { // Always forward the processed block to the "finalization" stage so it can emit // `ChainEvent::Block` even when there are no relevant contract logs. - requests_indexed - .send(BlockAndRequests::new( - block_number, - block_hash, - sign_requests, - respond_logs, - )) - .await - .map_err(|err| anyhow::anyhow!("Failed to send indexed requests: {:?}", err))?; - - Ok(()) + Ok(BlockAndRequests::new( + block_number, + block_hash, + sign_requests, + respond_logs, + exec_events, + )) } async fn collect_execution_confirmations( @@ -1153,207 +1068,174 @@ impl EthereumIndexer { Ok(events) } - /// Sends a request to the sign queue when the block where the request is in is finalized. - async fn send_requests_when_final( + /// Emits the processed block in-order once the configured buffer policy allows it. + async fn emit_processed_block( client: Arc, - mut requests_indexed: mpsc::Receiver, - mut finalized_block_rx: mpsc::Receiver, events_tx: mpsc::Sender, - optimistic_requests: bool, - ) { - let mut finalized_block_number: Option = None; - - loop { - let Some(BlockAndRequests { + eth: &EthConfig, + BlockAndRequests { + block_number, + block_hash, + indexed_requests, + respond_logs, + execution_events, + }: BlockAndRequests, + ) -> anyhow::Result<()> { + if !eth.optimistic_requests { + Self::wait_for_finalized_block( + Arc::clone(&client), + eth.refresh_finalized_interval, block_number, - block_hash, - indexed_requests, - respond_logs, - }) = requests_indexed.recv().await - else { - tracing::error!("Failed to receive indexed requests"); - return; - }; - - if !optimistic_requests { - // Wait for finalized block if needed - while finalized_block_number.is_none_or(|n| block_number > n) { - let Some(new_finalized_block) = finalized_block_rx.recv().await else { - tracing::error!("Failed to receive finalized blocks"); - return; - }; - finalized_block_number.replace(new_finalized_block); - } - } - - // Verify block hash and send requests - let block = client - .as_ref() - .get_block(alloy::rpc::types::BlockId::Number( - BlockNumberOrTag::Number(block_number), - )) - .await; + ) + .await?; + } - let Some(block) = block else { - tracing::warn!("Block {block_number} not found from Ethereum client, skipping this block and its requests"); - continue; - }; + let Some(block) = client + .as_ref() + .get_block(alloy::rpc::types::BlockId::Number( + BlockNumberOrTag::Number(block_number), + )) + .await + else { + anyhow::bail!("ethereum block {block_number} not found during emission"); + }; - if block.header.hash == block_hash { - tracing::info!("Block {block_number} is finalized!"); + if block.header.hash != block_hash { + anyhow::bail!( + "block {block_number} hash mismatch: expected {block_hash:?}, got {:?}", + block.header.hash + ); + } - for req in indexed_requests.clone() { - if let Err(err) = events_tx.send(ChainEvent::SignRequest(req)).await { - tracing::error!(?err, "failed to emit SignRequest event"); - } - } + for event in execution_events { + events_tx.send(event).await.map_err(|err| { + anyhow::anyhow!("failed to emit ExecutionConfirmed event: {err:?}") + })?; + } - if !respond_logs.is_empty() { - emit_respond_events(&respond_logs, events_tx.clone()).await; - } + for req in indexed_requests { + events_tx + .send(ChainEvent::SignRequest(req)) + .await + .map_err(|err| anyhow::anyhow!("failed to emit SignRequest event: {err:?}"))?; + } - if let Err(err) = events_tx.send(ChainEvent::Block(block_number)).await { - tracing::error!(?err, "failed to emit block event"); - } - } else { - // no special handling for chain reorg, just log the error - // This is because when such chain reorg happens, the new canonical chain will have already been emitted by helios's block header stream, and we can safely skip this block here. - tracing::error!( - "Block {block_number} hash mismatch: expected {block_hash:?}, got {:?}. Chain re-orged.", - block.header.hash - ); - } + if !respond_logs.is_empty() { + emit_respond_events(&respond_logs, events_tx.clone()).await; } - } - #[allow(clippy::too_many_arguments)] - async fn retry_failed_blocks( - client: Arc, - mut blocks_failed_rx: mpsc::Receiver, - blocks_failed_tx: mpsc::Sender, - contract_address: Address, - requests_indexed: mpsc::Sender, - backlog: Backlog, - events_tx: mpsc::Sender, - ) { - loop { - let Some(block) = blocks_failed_rx.recv().await else { - tracing::warn!("Failed to receive block and requests from requests_indexed"); - break; - }; - let block_number = block.header.number; - if let Err(err) = Self::process_block( - client.clone(), - block.clone(), - contract_address, - requests_indexed.clone(), - backlog.clone(), - events_tx.clone(), - ) + events_tx + .send(ChainEvent::Block(block_number)) .await - { - tracing::warn!("Retry failed for block {block_number}: {err:?}"); - Self::add_failed_block(blocks_failed_tx.clone(), block).await; - } else { - tracing::info!("Successfully retried block: {block_number}"); - } - } - } + .map_err(|err| anyhow::anyhow!("failed to emit block event: {err:?}"))?; - async fn add_failed_block( - blocks_failed: mpsc::Sender, - block: alloy::rpc::types::Block, - ) { - blocks_failed.send(block).await.unwrap_or_else(|err| { - tracing::warn!("Failed to send failed block: {:?}", err); - }); + Ok(()) } - /// Polls for the latest finalized block and update finalized block channel. - async fn refresh_finalized_block( + async fn wait_for_finalized_block( client: Arc, - finalized_block_send: mpsc::Sender, - refresh_finalized_interval: u64, - ) { - let mut interval = tokio::time::interval(Duration::from_millis(refresh_finalized_interval)); - let mut final_block_number: Option = None; + retry_interval: u64, + block_number: BlockNumber, + ) -> anyhow::Result<()> { + let mut last_final_block_number: Option = None; loop { - interval.tick().await; - tracing::debug!("Refreshing finalized epoch"); - - let new_finalized_block = match client + let Some(finalized_block) = client .as_ref() .get_block(alloy::rpc::types::BlockId::Number( BlockNumberOrTag::Finalized, )) .await - { - Some(block) => block, - None => { - tracing::warn!("Finalized block not found from Ethereum client"); - continue; - } + else { + tracing::warn!(block_number, "finalized ethereum block not found; retrying"); + tokio::time::sleep(Duration::from_millis(retry_interval)).await; + continue; }; - let new_final_block_number = new_finalized_block.header.number; - tracing::debug!( - "New finalized block number: {new_final_block_number}, last finalized block number: {final_block_number:?}" - ); - - if final_block_number.is_none_or(|n| new_final_block_number > n) { - tracing::info!("Found new finalized block!"); - if let Err(err) = finalized_block_send.send(new_final_block_number).await { - tracing::warn!("Failed to send finalized block: {err:?}"); - continue; - } - final_block_number.replace(new_final_block_number); + let new_final_block_number = finalized_block.header.number; + if last_final_block_number.is_none_or(|n| new_final_block_number > n) { + tracing::debug!( + new_final_block_number, + last_final_block_number, + "New finalized block number" + ); + last_final_block_number.replace(new_final_block_number); crate::metrics::indexers::LATEST_BLOCK_NUMBER .with_label_values(&[Chain::Ethereum.as_str(), "finalized"]) .set(new_final_block_number as i64); - continue; } - let Some(last_final_block_number) = final_block_number else { + let Some(last_final_block_number) = last_final_block_number else { continue; }; if new_final_block_number < last_final_block_number { tracing::warn!( - "New finalized block number overflowed range of u64 and has wrapped around!" + new_final_block_number, + last_final_block_number, + "new finalized block number overflowed range of u64 and has wrapped around!" ); } - if last_final_block_number == new_final_block_number { - tracing::debug!("No new finalized block"); + if new_final_block_number == last_final_block_number { + tracing::debug!(new_final_block_number, "no new finalized block"); } + + // If the finalized block number has advanced past the block we're waiting for, + // we can proceed with emitting it. + if new_final_block_number >= block_number { + return Ok(()); + }; + + tokio::time::sleep(Duration::from_millis(retry_interval)).await; } } +} - async fn add_catchup_blocks_to_process( - blocks_to_process: mpsc::Sender, - start_block_number: u64, - end_block_number: u64, - ) { - // helios can only go back maximum MAX_CATCHUP_BLOCKS blocks, so we need to adjust the start block number if it's too far behind - let helios_oldest_block_number = end_block_number.saturating_sub(MAX_CATCHUP_BLOCKS); - let start_block_number = if start_block_number < helios_oldest_block_number { - tracing::warn!( - "Start block number {start_block_number} is too far behind the latest block {end_block_number}, adjusting to {helios_oldest_block_number}" - ); - helios_oldest_block_number - } else { - start_block_number - }; +#[async_trait] +impl ChainIndexer for EthereumIndexer { + type BufferedStream = EthereumBufferedStream; - for block_number in start_block_number..=end_block_number { - if let Err(err) = blocks_to_process - .send(BlockToProcess::Catchup(block_number)) - .await - { - tracing::warn!("Failed to send block to process: {err:?}"); - } + async fn livestream(&mut self) -> anyhow::Result> { + let (live_blocks_tx, live_blocks_rx) = live_blocks_channel(); + tokio::spawn(EthereumIndexer::buffer_live_blocks( + self.client.clone(), + live_blocks_tx, + )); + + Ok(Some(EthereumBufferedStream { live_blocks_rx })) + } + + fn buffered_item_height(block: &::Block) -> u64 { + block.header.number + } + + async fn catchup_range(&mut self, anchor_height: u64) -> std::ops::Range { + let catchup_start = EthereumIndexer::catchup_start_block_number( + self.backlog.processed_block(Chain::Ethereum).await, + anchor_height, + ); + + catchup_start..anchor_height + } + + async fn process_catchup_on_height(&mut self, height: u64) -> anyhow::Result<()> { + if height.is_multiple_of(10) { + tracing::info!(height, "processed ethereum catchup height attempt"); } + + self.process_height(height).await + } + + async fn process_buffered_block( + &mut self, + block: ::Block, + ) -> anyhow::Result<()> { + self.process_live_block(block).await + } + + fn retry_delay(&self) -> Duration { + Duration::from_millis(500) } } @@ -1361,9 +1243,8 @@ impl EthereumIndexer { /// Construction is side-effect free; the shared `run_stream()` loop calls /// `start()` after recovery has completed. pub struct EthereumStream { - events_rx: mpsc::Receiver, - indexer: Option<(EthereumIndexer, mpsc::Sender)>, - tasks: Vec>, + events_rx: Option>, + start_state: Option, } impl EthereumStream { @@ -1382,46 +1263,60 @@ impl EthereumStream { "creating ethereum indexer stream" ); - let indexer = EthereumIndexer::new(eth, backlog).await?; - let (events_tx, events_rx) = crate::stream::channel(); + let indexer = EthereumIndexer::new(eth, backlog, events_tx).await?; Ok(Self { - events_rx, - indexer: Some((indexer, events_tx)), - tasks: Vec::new(), + events_rx: Some(events_rx), + start_state: Some(indexer), }) } +} - pub fn start(&mut self) { - let Some((indexer, events_tx)) = self.indexer.take() else { - return; - }; - - let t_indexer: JoinHandle<()> = tokio::spawn(async move { - indexer.run(events_tx).await; - }); +#[async_trait] +impl ChainStream for EthereumStream { + const CHAIN: Chain = Chain::Ethereum; + type Indexer = EthereumIndexer; - self.tasks.push(t_indexer); + async fn start(&mut self) -> anyhow::Result { + self.start_state + .take() + .ok_or_else(|| anyhow::anyhow!("ethereum stream already started")) } -} -impl Drop for EthereumStream { - fn drop(&mut self) { - for t in &self.tasks { - t.abort(); + async fn next_event(&mut self) -> Option { + match self.events_rx.as_mut() { + Some(rx) => rx.recv().await, + None => None, } } } +#[cfg(test)] +mod tests { + use super::EthereumIndexer; + + #[test] + fn catchup_starts_after_processed_height() { + assert_eq!( + EthereumIndexer::catchup_start_block_number(Some(41), 50), + 42 + ); + } -impl ChainStream for EthereumStream { - const CHAIN: Chain = Chain::Ethereum; - - async fn start(&mut self) { - self.start(); + #[test] + fn catchup_without_checkpoint_starts_from_anchor() { + assert_eq!(EthereumIndexer::catchup_start_block_number(None, 50), 50); } - async fn next_event(&mut self) -> Option { - self.events_rx.recv().await + #[test] + fn catchup_start_is_clamped_to_supported_window() { + let anchor_height = 10_000; + let catchup_end = anchor_height - 1; + let expected_oldest = catchup_end - super::MAX_CATCHUP_BLOCKS; + + assert_eq!( + EthereumIndexer::catchup_start_block_number(Some(1), anchor_height), + expected_oldest, + ); } } diff --git a/chain-signatures/node/src/indexer_sol.rs b/chain-signatures/node/src/indexer_sol.rs index 3614fd24..7ba4c5bc 100644 --- a/chain-signatures/node/src/indexer_sol.rs +++ b/chain-signatures/node/src/indexer_sol.rs @@ -1,9 +1,10 @@ use crate::protocol::{Chain, IndexedSignRequest}; use crate::sign_bidirectional::hash_rlp_data; use crate::stream::ops::{SignatureEvent, SignatureEventBox}; -use crate::stream::{ChainEvent, ChainStream}; +use crate::stream::{ChainEvent, ChainStream, DisabledChainIndexer}; use crate::util::retry::{retry_async, RetryConfig, RetryError, RetryReason}; +use async_trait::async_trait; use std::collections::HashMap; use std::fmt; use std::str::FromStr; @@ -293,7 +294,7 @@ type Result = anyhow::Result; /// Solana stream that implements the new ChainStream abstraction pub struct SolanaStream { - rx: mpsc::Receiver, + rx: Option>, start_state: Option, tasks: Vec>, } @@ -328,7 +329,7 @@ impl SolanaStream { let (tx, rx) = crate::stream::channel(); Some(SolanaStream { - rx, + rx: Some(rx), start_state: Some(SolanaStreamStartState { program_id, rpc_http_url: sol.rpc_http_url.clone(), @@ -340,12 +341,14 @@ impl SolanaStream { } } +#[async_trait] impl ChainStream for SolanaStream { const CHAIN: Chain = Chain::Solana; + type Indexer = DisabledChainIndexer; - async fn start(&mut self) { + async fn start(&mut self) -> anyhow::Result { let Some(start_state) = self.start_state.take() else { - return; + anyhow::bail!("solana stream already started"); }; self.tasks.push(spawn_cpi_sign_events( @@ -360,10 +363,15 @@ impl ChainStream for SolanaStream { start_state.rpc_ws_url, start_state.tx, )); + + Ok(DisabledChainIndexer) } async fn next_event(&mut self) -> Option { - self.rx.recv().await + match self.rx.as_mut() { + Some(rx) => rx.recv().await, + None => None, + } } } diff --git a/chain-signatures/node/src/stream/mod.rs b/chain-signatures/node/src/stream/mod.rs index b549964c..fa7fab06 100644 --- a/chain-signatures/node/src/stream/mod.rs +++ b/chain-signatures/node/src/stream/mod.rs @@ -11,7 +11,11 @@ use crate::stream::ops::{ RespondBidirectionalEvent, SignatureRespondedEvent, }; +use async_trait::async_trait; +use std::time::Duration; +use std::{ops::Range, vec::Vec}; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::watch; pub mod ops; @@ -29,9 +33,6 @@ pub enum ChainEvent { Respond(SignatureRespondedEvent), RespondBidirectional(RespondBidirectionalEvent), - /// The stream has finished replaying catch-up data for this chain. - CatchupCompleted, - /// Block height indicating the client has observed/processed up to `u64` (slot/block) Block(u64), @@ -65,7 +66,6 @@ impl std::fmt::Debug for ChainEvent { .field(&ev.request_id()) .field(&ev.source_chain().as_str()) .finish(), - ChainEvent::CatchupCompleted => f.debug_tuple("CatchupCompleted").finish(), ChainEvent::Block(b) => write!(f, "Block({b})"), ChainEvent::ExecutionConfirmed { tx_id, @@ -91,13 +91,145 @@ pub enum ExecutionOutcome { Failed, } -#[allow(async_fn_in_trait)] +pub struct DisabledBufferedStream; + +#[async_trait] +impl ChainBufferedStream for DisabledBufferedStream { + type Block = (); + + async fn initial(&mut self) -> Option { + None + } + + async fn next(&mut self) -> Option { + None + } +} + +#[async_trait] +pub trait ChainBufferedStream: Send + 'static { + type Block: Send + 'static; + + async fn initial(&mut self) -> Option; + async fn next(&mut self) -> Option; +} + +#[async_trait] +pub trait ChainIndexer: Send + 'static { + type BufferedStream: ChainBufferedStream; + + async fn livestream(&mut self) -> anyhow::Result> { + Ok(None) + } + + fn buffered_item_height(block: &::Block) -> u64 { + let _ = block; + 0 + } + + async fn catchup_range(&mut self, _anchor_height: u64) -> Range { + 0..0 + } + + async fn process_catchup_on_height(&mut self, _height: u64) -> anyhow::Result<()> { + Ok(()) + } + + async fn process_buffered_block( + &mut self, + block: ::Block, + ) -> anyhow::Result<()> { + let _ = block; + Ok(()) + } + + fn retry_delay(&self) -> Duration { + Duration::from_millis(500) + } +} + +pub struct DisabledChainIndexer; + +#[async_trait] +impl ChainIndexer for DisabledChainIndexer { + type BufferedStream = DisabledBufferedStream; +} + +#[async_trait] pub trait ChainStream: Send + 'static { const CHAIN: Chain; - async fn start(&mut self) {} + type Indexer: ChainIndexer; + + async fn start(&mut self) -> anyhow::Result; async fn next_event(&mut self) -> Option; } +pub(crate) async fn catchup_then_livestream( + chain: Chain, + indexer: &mut I, + catchup_completed_tx: oneshot::Sender<()>, +) { + tracing::info!(%chain, "starting ChainStream catchup then livestream"); + + let buffered = match indexer.livestream().await { + Ok(buffered) => buffered, + Err(err) => { + tracing::error!(?err, %chain, "failed to initialize livestream"); + return; + } + }; + let Some(mut buffered) = buffered else { + let _ = catchup_completed_tx.send(()); + return; + }; + + let Some(anchor_block) = buffered.initial().await else { + tracing::warn!(%chain, "buffered livestream ended before anchor block"); + return; + }; + + let anchor_height = I::buffered_item_height(&anchor_block); + let catchup_range = indexer.catchup_range(anchor_height).await; + + for height in catchup_range { + loop { + match indexer.process_catchup_on_height(height).await { + Ok(()) => break, + Err(err) => { + tracing::warn!(?err, %chain, height, "catchup height processing failed; retrying"); + tokio::time::sleep(indexer.retry_delay()).await; + } + } + } + } + + let _ = catchup_completed_tx.send(()); + + let mut next_block = anchor_block; + loop { + if let Err(err) = indexer.process_buffered_block(next_block).await { + tracing::warn!(?err, %chain, "buffered block processing failed"); + } + + match buffered.next().await { + Some(block) => next_block = block, + None => break, + } + } +} + +pub async fn spawn_stream_indexer( + stream: &mut S, +) -> anyhow::Result> { + let mut indexer = stream.start().await?; + let chain = S::CHAIN; + + Ok(tokio::spawn(async move { + let (catchup_completed_tx, _catchup_completed_rx) = oneshot::channel(); + catchup_then_livestream(chain, &mut indexer, catchup_completed_tx).await; + })) +} + /// Shared indexer loop: recovers backlog then processes events from the stream pub async fn run_stream( mut stream: S, @@ -108,8 +240,7 @@ pub async fn run_stream( node_client: NodeClient, ) { let chain = S::CHAIN; - - tracing::info!(%chain, "starting indexer loop"); + tracing::info!(%chain, "starting stream"); let mut recovered = recover_backlog( &backlog, @@ -121,37 +252,28 @@ pub async fn run_stream( ) .await; - // NOTE: we need to start after we recover entries from backlog and starting the run_stream task - // such that we can guarantee getting the CatchupCompleted event from this task to modify the - // recovered entries. - stream.start().await; + let mut indexer = match stream.start().await { + Ok(indexer) => indexer, + Err(err) => { + tracing::error!(?err, %chain, "failed to start stream"); + return; + } + }; + + let (catchup_completed_tx, mut catchup_completed_rx) = oneshot::channel(); + tokio::spawn(async move { + catchup_then_livestream(chain, &mut indexer, catchup_completed_tx).await; + }); - while let Some(event) = stream.next_event().await { - match event { - ChainEvent::SignRequest(req) => { - // process sign request (insert into backlog + send sign request) - if let Err(err) = process_sign_request(req, sign_tx.clone(), backlog.clone()).await - { - tracing::error!(?err, chain = %chain, "failed to process sign request"); - } - } - ChainEvent::Respond(ev) => { - if let Err(err) = - process_respond_event(ev, sign_tx.clone(), &mut contract_watcher, &backlog) - .await - { - tracing::error!(?err, chain = %chain, "failed to process respond event"); - } - } - ChainEvent::RespondBidirectional(ev) => { - if let Err(err) = - process_respond_bidirectional_event(ev, sign_tx.clone(), &backlog).await + let mut catchup_completed = false; + loop { + tokio::select! { + result = &mut catchup_completed_rx, if !catchup_completed => { + catchup_completed = true; + + if result.is_ok() + && recovered.requeue_mode == crate::backlog::RecoveryRequeueMode::AfterCatchup { - tracing::error!(?err, chain = %chain, "failed to process respond bidirectional event"); - } - } - ChainEvent::CatchupCompleted => { - if recovered.requeue_mode == crate::backlog::RecoveryRequeueMode::AfterCatchup { requeue_recovered_sign_requests( &backlog, chain, @@ -161,36 +283,67 @@ pub async fn run_stream( .await; recovered.pending.clear(); } + + continue; } - ChainEvent::Block(block) => { - // central checkpointing for all chains - if let Some(checkpoint) = backlog.set_processed_block(S::CHAIN, block).await { - tracing::info!(block, ?checkpoint, chain = %chain, "created checkpoint"); - } - crate::metrics::indexers::LATEST_BLOCK_NUMBER - .with_label_values(&[S::CHAIN.as_str(), "indexed"]) - .set(block as i64); - } - ChainEvent::ExecutionConfirmed { - tx_id, - sign_id, - source_chain, - block_height, - result, - } => { - if let Err(err) = process_execution_confirmed( - tx_id, - sign_id, - source_chain, - block_height, - result, - &backlog, - sign_tx.clone(), - S::CHAIN, - ) - .await - { - tracing::error!(?err, chain = %chain, "failed to process execution confirmation"); + event = stream.next_event() => { + let Some(event) = event else { + tracing::info!(%chain, "stream dropped event channel"); + break; + }; + + match event { + ChainEvent::SignRequest(req) => { + if let Err(err) = process_sign_request(req, sign_tx.clone(), backlog.clone()).await + { + tracing::error!(?err, %chain, "failed to process sign request"); + } + } + ChainEvent::Respond(ev) => { + if let Err(err) = + process_respond_event(ev, sign_tx.clone(), &mut contract_watcher, &backlog) + .await + { + tracing::error!(?err, %chain, "failed to process respond event"); + } + } + ChainEvent::RespondBidirectional(ev) => { + if let Err(err) = + process_respond_bidirectional_event(ev, sign_tx.clone(), &backlog).await + { + tracing::error!(?err, %chain, "failed to process respond bidirectional event"); + } + } + ChainEvent::Block(block) => { + if let Some(checkpoint) = backlog.set_processed_block(S::CHAIN, block).await { + tracing::info!(block, ?checkpoint, %chain, "created checkpoint"); + } + crate::metrics::indexers::LATEST_BLOCK_NUMBER + .with_label_values(&[S::CHAIN.as_str(), "indexed"]) + .set(block as i64); + } + ChainEvent::ExecutionConfirmed { + tx_id, + sign_id, + source_chain, + block_height, + result, + } => { + if let Err(err) = process_execution_confirmed( + tx_id, + sign_id, + source_chain, + block_height, + result, + &backlog, + sign_tx.clone(), + S::CHAIN, + ) + .await + { + tracing::error!(?err, %chain, "failed to process execution confirmation"); + } + } } } } @@ -219,6 +372,8 @@ mod tests { use mpc_primitives::SignId; use mpc_primitives::Signature; use near_primitives::types::AccountId; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::timeout; @@ -227,8 +382,16 @@ mod tests { events: Vec>, } + #[async_trait] impl ChainStream for TestEventStream { const CHAIN: Chain = Chain::Solana; + + type Indexer = DisabledChainIndexer; + + async fn start(&mut self) -> anyhow::Result { + Ok(DisabledChainIndexer) + } + async fn next_event(&mut self) -> Option { if self.events.is_empty() { return None; @@ -237,6 +400,206 @@ mod tests { } } + struct TestBufferedStream { + items: Vec, + } + + #[async_trait] + impl ChainBufferedStream for TestBufferedStream { + type Block = u64; + + async fn initial(&mut self) -> Option { + if self.items.is_empty() { + return None; + } + Some(self.items.remove(0)) + } + + async fn next(&mut self) -> Option { + if self.items.is_empty() { + return None; + } + Some(self.items.remove(0)) + } + } + + #[derive(Clone)] + struct TestLinearControl { + persisted_height: Option, + live_items: Vec, + catchup_failures: Arc>>, + live_failures: Arc>>, + } + + impl TestLinearControl { + fn new(persisted_height: Option, live_items: Vec) -> Self { + Self { + persisted_height, + live_items, + catchup_failures: Arc::new(Mutex::new(HashMap::new())), + live_failures: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn fail_catchup_once(self, height: u64) -> Self { + self.catchup_failures.lock().unwrap().insert(height, 1); + self + } + + fn fail_live_once(self, height: u64) -> Self { + self.live_failures.lock().unwrap().insert(height, 1); + self + } + + fn consume_failure(map: &Mutex>, height: u64) -> bool { + let mut failures = map.lock().unwrap(); + let Some(remaining) = failures.get_mut(&height) else { + return false; + }; + if *remaining == 0 { + return false; + } + *remaining -= 1; + true + } + + fn retry_delay(&self) -> Duration { + Duration::from_millis(1) + } + } + + struct TestLinearStream { + control: TestLinearControl, + rx: mpsc::Receiver, + tx: mpsc::Sender, + } + + impl TestLinearStream { + fn new(control: TestLinearControl) -> Self { + let (tx, rx) = mpsc::channel(16); + Self { control, rx, tx } + } + } + + struct TestLinearIndexer { + control: TestLinearControl, + tx: mpsc::Sender, + } + + #[async_trait] + impl ChainIndexer for TestLinearIndexer { + type BufferedStream = TestBufferedStream; + + async fn livestream(&mut self) -> anyhow::Result> { + Ok(Some(TestBufferedStream { + items: self.control.live_items.clone(), + })) + } + + fn buffered_item_height( + block: &::Block, + ) -> u64 { + *block + } + + async fn catchup_range(&mut self, anchor_height: u64) -> Range { + let start = self + .control + .persisted_height + .map(|height| height + 1) + .unwrap_or(anchor_height); + start..anchor_height + } + + async fn process_catchup_on_height(&mut self, height: u64) -> anyhow::Result<()> { + if TestLinearControl::consume_failure(&self.control.catchup_failures, height) { + anyhow::bail!("synthetic catchup failure at height {height}"); + } + self.tx.send(ChainEvent::Block(height)).await?; + Ok(()) + } + + async fn process_buffered_block( + &mut self, + block: ::Block, + ) -> anyhow::Result<()> { + if TestLinearControl::consume_failure(&self.control.live_failures, block) { + anyhow::bail!("synthetic live failure at height {block}"); + } + self.tx.send(ChainEvent::Block(block)).await?; + Ok(()) + } + + fn retry_delay(&self) -> Duration { + self.control.retry_delay() + } + } + + #[async_trait] + impl ChainStream for TestLinearStream { + const CHAIN: Chain = Chain::Ethereum; + type Indexer = TestLinearIndexer; + + async fn start(&mut self) -> anyhow::Result { + Ok(TestLinearIndexer { + control: self.control.clone(), + tx: self.tx.clone(), + }) + } + + async fn next_event(&mut self) -> Option { + self.rx.recv().await + } + } + + #[tokio::test] + async fn test_run_linearized_source_orders_catchup_before_live() { + let mut stream = TestLinearStream::new(TestLinearControl::new(Some(1), vec![4, 5])); + let mut indexer = stream.start().await.unwrap(); + let (tx, _rx) = oneshot::channel(); + catchup_then_livestream(Chain::Ethereum, &mut indexer, tx).await; + + let mut observed = Vec::new(); + while let Some(event) = timeout(Duration::from_millis(20), stream.next_event()) + .await + .ok() + .flatten() + { + observed.push(event); + } + + assert!(matches!(observed[0], ChainEvent::Block(2))); + assert!(matches!(observed[1], ChainEvent::Block(3))); + assert!(matches!(observed[2], ChainEvent::Block(4))); + assert!(matches!(observed[3], ChainEvent::Block(5))); + } + + #[tokio::test] + async fn test_run_linearized_source_retries_without_reordering() { + let mut stream = TestLinearStream::new( + TestLinearControl::new(Some(1), vec![4, 5]) + .fail_catchup_once(3) + .fail_live_once(4), + ); + let mut indexer = stream.start().await.unwrap(); + let (tx, _rx) = oneshot::channel(); + catchup_then_livestream(Chain::Ethereum, &mut indexer, tx).await; + + let mut observed = Vec::new(); + while let Some(event) = timeout(Duration::from_millis(20), stream.next_event()) + .await + .ok() + .flatten() + { + observed.push(event); + } + + assert!(matches!(observed[0], ChainEvent::Block(2))); + assert!(matches!(observed[1], ChainEvent::Block(3))); + assert!(matches!(observed[2], ChainEvent::Block(4))); + assert!(matches!(observed[3], ChainEvent::Block(5))); + } + #[tokio::test] async fn test_stream_handles_sign_and_respond() { let backlog = Backlog::new(); @@ -329,11 +692,14 @@ mod tests { event: Option, } + #[async_trait] impl ChainStream for StartAwareStream { const CHAIN: Chain = Chain::Solana; + type Indexer = DisabledChainIndexer; - async fn start(&mut self) { + async fn start(&mut self) -> anyhow::Result { self.started = true; + Ok(DisabledChainIndexer) } async fn next_event(&mut self) -> Option { @@ -414,8 +780,15 @@ mod tests { rx: mpsc::Receiver, } + #[async_trait] impl ChainStream for LocalStream { const CHAIN: Chain = Chain::Solana; + type Indexer = DisabledChainIndexer; + + async fn start(&mut self) -> anyhow::Result { + Ok(DisabledChainIndexer) + } + async fn next_event(&mut self) -> Option { self.rx.recv().await } @@ -655,8 +1028,14 @@ mod tests { events: Vec>, } + #[async_trait] impl ChainStream for EthereumLocalStream { const CHAIN: Chain = Chain::Ethereum; + type Indexer = DisabledChainIndexer; + + async fn start(&mut self) -> anyhow::Result { + Ok(DisabledChainIndexer) + } async fn next_event(&mut self) -> Option { if self.events.is_empty() { @@ -673,11 +1052,7 @@ mod tests { }); let client = EthereumLocalStream { - events: vec![ - Some(ChainEvent::Respond(respond)), - Some(ChainEvent::CatchupCompleted), - None, - ], + events: vec![Some(ChainEvent::Respond(respond)), None], }; let backlog = Backlog::persisted(storage); diff --git a/integration-tests/tests/cases/ethereum_stream.rs b/integration-tests/tests/cases/ethereum_stream.rs index ed9abf03..16fd5659 100644 --- a/integration-tests/tests/cases/ethereum_stream.rs +++ b/integration-tests/tests/cases/ethereum_stream.rs @@ -1,5 +1,6 @@ use alloy::primitives::{Address as AlloyAddress, B256}; use anyhow::{Context, Result}; +use cait_sith::protocol::Participant; use ethers::middleware::{Middleware, SignerMiddleware}; use ethers::providers::{Http, Provider}; use ethers::signers::{LocalWallet, Signer}; @@ -13,13 +14,22 @@ use integration_tests::eth::{ use k256::elliptic_curve::sec1::ToEncodedPoint as _; use mpc_node::backlog::Backlog; use mpc_node::indexer_eth::{EthConfig, EthereumStream}; -use mpc_node::protocol::{Chain, SignKind}; +use mpc_node::mesh::{connection::NodeStatus, MeshState}; +use mpc_node::node_client::NodeClient; +use mpc_node::protocol::{Chain, ParticipantInfo, Sign, SignKind}; +use mpc_node::rpc::ContractStateWatcher; +use mpc_node::storage::checkpoint_storage::CheckpointStorage; +use mpc_node::stream::ops::SignBidirectionalEvent as NodeSignBidirectionalEvent; use mpc_node::stream::ops::SignatureRespondedEvent; -use mpc_node::stream::{ChainEvent, ChainStream}; -use mpc_primitives::{SignId, LATEST_MPC_KEY_VERSION}; +use mpc_node::stream::spawn_stream_indexer; +use mpc_node::stream::{run_stream, ChainEvent, ChainStream}; +use mpc_node::util::current_unix_timestamp; +use mpc_primitives::{SignArgs, SignId, LATEST_MPC_KEY_VERSION}; +use near_primitives::types::AccountId; use rand::thread_rng; use std::sync::Arc; use std::time::Duration; +use tokio::sync::{mpsc, watch}; use tokio::time::timeout; fn signature_deposit() -> U256 { @@ -163,7 +173,156 @@ async fn submit_sign_request( Ok(receipt.transaction_hash) } -async fn next_event_within(client: &mut EthereumStream, duration: Duration) -> Result { +async fn submit_sign_request_with_block( + ctx: &EthereumTestEnvironment, + payload: [u8; 32], + path: &str, +) -> Result<(H256, u64)> { + let contract = ctx.contract(); + let sign_request = SignRequest { + payload, + path: path.to_string(), + key_version: LATEST_MPC_KEY_VERSION, + algo: "secp256k1".to_string(), + dest: "".to_string(), + params: "".to_string(), + }; + + let call = contract.sign(sign_request).value(signature_deposit()); + let pending_tx = call.send().await?; + let receipt = pending_tx + .await + .context("failed to mine sign transaction")? + .context("sign transaction dropped from mempool")?; + + Ok(( + receipt.transaction_hash, + receipt + .block_number + .context("sign transaction missing block number")? + .as_u64(), + )) +} + +async fn submit_eth_transfer(ctx: &EthereumTestEnvironment) -> Result { + let pending_tx = ctx + .signer + .send_transaction( + TransactionRequest::new().to(ctx.wallet).value(U256::zero()), + None, + ) + .await?; + let receipt = pending_tx + .await + .context("failed to mine eth transfer transaction")? + .context("eth transfer transaction dropped from mempool")?; + Ok(receipt.transaction_hash) +} + +async fn submit_respond_for_request_id( + contract: ChainSignaturesContract>, + request_id: [u8; 32], +) -> Result +where + M: Middleware + 'static, +{ + let enc = k256::ProjectivePoint::GENERATOR.to_encoded_point(false); + let x = enc.x().expect("generator must have x coordinate"); + let y = enc.y().expect("generator must have y coordinate"); + let s = U256::from_big_endian(k256::Scalar::from(11u64).to_bytes().as_slice()); + + let response = chain_signatures_contract::Response { + request_id, + signature: chain_signatures_contract::Signature { + big_r: chain_signatures_contract::AffinePoint { + x: U256::from_big_endian(x), + y: U256::from_big_endian(y), + }, + s, + recovery_id: 1, + }, + }; + + let respond_call = contract.respond(vec![response]); + let pending_tx = respond_call.send().await?; + let receipt = pending_tx + .await + .context("respond transaction execution failed")? + .context("respond transaction dropped from mempool")?; + Ok(receipt.transaction_hash) +} + +async fn next_sign_message_within( + rx: &mut mpsc::Receiver, + duration: Duration, +) -> Result { + timeout(duration, rx.recv()) + .await + .context("timed out waiting for sign message")? + .context("sign channel closed unexpectedly") +} + +fn test_sign_args(seed: u8) -> SignArgs { + SignArgs { + entropy: [seed; 32], + epsilon: k256::Scalar::from(1u64), + payload: k256::Scalar::from((seed as u64) + 1), + path: format!("test-path-{seed}"), + key_version: LATEST_MPC_KEY_VERSION, + } +} + +fn test_bidirectional_event() -> NodeSignBidirectionalEvent { + let mut rlp_s = rlp::RlpStream::new_list(9); + rlp_s.append(&0u64); + rlp_s.append(&0u64); + rlp_s.append(&0u64); + rlp_s.append(&Vec::::new()); + rlp_s.append(&0u64); + rlp_s.append(&Vec::::new()); + rlp_s.append(&1u64); + rlp_s.append(&0u64); + rlp_s.append(&0u64); + + NodeSignBidirectionalEvent::Solana(signet_program::SignBidirectionalEvent { + sender: solana_sdk::pubkey::Pubkey::new_unique(), + serialized_transaction: rlp_s.out().to_vec(), + dest: Chain::Ethereum.to_string(), + caip2_id: "eip155:31337".to_string(), + key_version: LATEST_MPC_KEY_VERSION, + deposit: 1, + path: "bidirectional-test-path".to_string(), + algo: "secp256k1".to_string(), + params: "{}".to_string(), + program_id: solana_sdk::pubkey::Pubkey::new_unique(), + output_deserialization_schema: vec![], + respond_serialization_schema: vec![], + }) +} + +struct StartedEthereumStream { + stream: EthereumStream, + _indexer_task: tokio::task::JoinHandle<()>, +} + +impl std::ops::Deref for StartedEthereumStream { + type Target = EthereumStream; + + fn deref(&self) -> &Self::Target { + &self.stream + } +} + +impl std::ops::DerefMut for StartedEthereumStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stream + } +} + +async fn next_event_within( + client: &mut StartedEthereumStream, + duration: Duration, +) -> Result { timeout(duration, async { loop { if let Some(event) = client.next_event().await { @@ -178,10 +337,252 @@ async fn next_event_within(client: &mut EthereumStream, duration: Duration) -> R async fn stream_ethereum( ctx: &EthereumTestEnvironment, backlog: Backlog, -) -> Result { +) -> Result { let mut stream = EthereumStream::new(Some(ctx.config(true)), backlog).await?; - ChainStream::start(&mut stream).await; - Ok(stream) + let indexer_task = spawn_stream_indexer(&mut stream).await?; + Ok(StartedEthereumStream { + stream, + _indexer_task: indexer_task, + }) +} + +#[test_log::test(tokio::test)] +async fn test_ethereum_stream_resume_starts_after_checkpoint_height() -> Result<()> { + let ctx = EthereumTestEnvironment::new().await?; + let storage = CheckpointStorage::in_memory(); + let seeded_backlog = Backlog::persisted(storage.clone()); + + let replayed_payload = [0x31; 32]; + let (_, processed_height) = + submit_sign_request_with_block(&ctx, replayed_payload, "resume-processed-path").await?; + + seeded_backlog + .set_processed_block(Chain::Ethereum, processed_height) + .await; + seeded_backlog.checkpoint(Chain::Ethereum).await; + + let mut expected_payload = [0x32; 32]; + loop { + let (_, block_height) = + submit_sign_request_with_block(&ctx, expected_payload, "resume-new-path").await?; + if block_height > processed_height { + break; + } + + expected_payload[0] = expected_payload[0].saturating_add(1); + } + + let backlog = Backlog::persisted(storage); + let stream = EthereumStream::new(Some(ctx.config(true)), backlog.clone()).await?; + let (sign_tx, mut sign_rx) = mpsc::channel(16); + let (contract_watcher, _contract_tx) = ContractStateWatcher::with_running( + &"test.near".parse::().unwrap(), + k256::ProjectivePoint::GENERATOR.to_affine(), + 1, + Default::default(), + ); + + let mut mesh_state = MeshState::default(); + let mut info = ParticipantInfo::new(0); + info.url = "http://127.0.0.1:1".to_string(); + mesh_state.update(Participant::from(0u32), NodeStatus::Active, info); + let (_mesh_tx, mesh_rx) = watch::channel(mesh_state); + + let run_handle = tokio::spawn(run_stream( + stream, + sign_tx, + backlog, + contract_watcher, + mesh_rx, + NodeClient::new(&Default::default()), + )); + + let mut saw_replayed_payload = false; + let mut saw_expected_payload = false; + for _ in 0..12 { + match next_sign_message_within(&mut sign_rx, Duration::from_secs(10)).await? { + Sign::Request(req) => { + let payload: [u8; 32] = req.args.payload.to_bytes().into(); + if payload == replayed_payload { + saw_replayed_payload = true; + } + if payload == expected_payload { + saw_expected_payload = true; + break; + } + } + _ => continue, + } + } + + run_handle.abort(); + + assert!( + !saw_replayed_payload, + "stream replayed the stored processed block" + ); + assert!( + saw_expected_payload, + "stream did not catch up the next block" + ); + Ok(()) +} + +#[test_log::test(tokio::test)] +async fn test_ethereum_stream_linear_catchup_from_checkpoint() -> Result<()> { + let ctx = EthereumTestEnvironment::new().await?; + + let responder_wallet = LocalWallet::new(&mut thread_rng()).with_chain_id(ctx.sandbox.chain_id); + let responder_address = responder_wallet.address(); + let responder_provider = + Provider::::try_from(ctx.sandbox.external_http_endpoint.as_str())?; + let responder_signer: Arc, LocalWallet>> = + Arc::new(SignerMiddleware::new(responder_provider, responder_wallet)); + let fund_tx = TransactionRequest::new() + .to(responder_address) + .value(U256::from(1_000_000_000_000_000u64)); + let pending_fund = ctx.signer.send_transaction(fund_tx, None).await?; + let _ = pending_fund + .await + .context("failed to mine responder funding transaction")? + .context("responder funding transaction dropped from mempool")?; + + let checkpoint_height = ctx.signer.get_block_number().await?.as_u64(); + let checkpoint_nonce = ctx + .signer + .get_transaction_count(ctx.wallet, None) + .await? + .as_u64(); + + let storage = CheckpointStorage::in_memory(); + let seeded_backlog = Backlog::persisted(storage.clone()); + + let resolved_sign_id = SignId::new([0x11; 32]); + let requeued_sign_id = SignId::new([0x22; 32]); + seeded_backlog + .insert(mpc_node::protocol::IndexedSignRequest::sign( + resolved_sign_id, + test_sign_args(0x11), + Chain::Ethereum, + current_unix_timestamp(), + )) + .await; + seeded_backlog + .insert(mpc_node::protocol::IndexedSignRequest::sign( + requeued_sign_id, + test_sign_args(0x22), + Chain::Ethereum, + current_unix_timestamp(), + )) + .await; + seeded_backlog + .set_processed_block(Chain::Ethereum, checkpoint_height) + .await; + seeded_backlog.checkpoint(Chain::Ethereum).await; + + let backlog = Backlog::persisted(storage.clone()); + + let execution_sign_id = SignId::new([0x33; 32]); + backlog + .insert(mpc_node::protocol::IndexedSignRequest::sign_bidirectional( + execution_sign_id, + test_sign_args(0x33), + Chain::Solana, + current_unix_timestamp(), + test_bidirectional_event(), + )) + .await; + + let execution_tx = mpc_node::sign_bidirectional::BidirectionalTx { + id: mpc_node::sign_bidirectional::BidirectionalTxId(B256::from([0x44; 32])), + sender: [0u8; 32], + serialized_transaction: vec![], + source_chain: Chain::Solana, + target_chain: Chain::Ethereum, + caip2_id: "eip155:31337".to_string(), + key_version: LATEST_MPC_KEY_VERSION, + deposit: 1, + path: "bidirectional-test-path".to_string(), + algo: "secp256k1".to_string(), + dest: Chain::Ethereum.to_string(), + params: "{}".to_string(), + output_deserialization_schema: vec![], + respond_serialization_schema: vec![], + request_id: execution_sign_id.request_id, + from_address: AlloyAddress::from_slice(ctx.wallet.as_bytes()), + nonce: checkpoint_nonce, + status: mpc_node::sign_bidirectional::SignStatus::PendingExecution, + }; + backlog + .advance(Chain::Solana, execution_sign_id, execution_tx) + .await + .context("failed to seed execution watcher")?; + + let responder_contract = + ChainSignaturesContract::new(ctx.contract_address, responder_signer.clone().into()); + + submit_respond_for_request_id(responder_contract, resolved_sign_id.request_id).await?; + submit_eth_transfer(&ctx).await?; + let catchup_payload = [0x55; 32]; + submit_sign_request(&ctx, catchup_payload, "catchup-linear-path").await?; + + let stream = EthereumStream::new(Some(ctx.config(true)), backlog.clone()).await?; + let (sign_tx, mut sign_rx) = mpsc::channel(16); + let (contract_watcher, _contract_tx) = ContractStateWatcher::with_running( + &"test.near".parse::().unwrap(), + k256::ProjectivePoint::GENERATOR.to_affine(), + 1, + Default::default(), + ); + + let mut mesh_state = MeshState::default(); + let mut info = ParticipantInfo::new(0); + info.url = "http://127.0.0.1:1".to_string(); + mesh_state.update(Participant::from(0u32), NodeStatus::Active, info); + let (_mesh_tx, mesh_rx) = watch::channel(mesh_state); + + let run_handle = tokio::spawn(run_stream( + stream, + sign_tx, + backlog.clone(), + contract_watcher, + mesh_rx, + NodeClient::new(&Default::default()), + )); + + let first = next_sign_message_within(&mut sign_rx, Duration::from_secs(20)).await?; + match first { + Sign::Completion(sign_id) => assert_eq!(sign_id, resolved_sign_id), + other => panic!("expected recovered respond completion first, got {other:?}"), + } + + let second = next_sign_message_within(&mut sign_rx, Duration::from_secs(20)).await?; + match second { + Sign::Request(req) => { + assert_eq!(req.id, execution_sign_id); + assert!(matches!(req.kind, SignKind::RespondBidirectional(_))); + assert_eq!(req.chain, Chain::Solana); + } + other => panic!("expected execution follow-up request second, got {other:?}"), + } + + let third = next_sign_message_within(&mut sign_rx, Duration::from_secs(20)).await?; + match third { + Sign::Request(req) => { + assert_eq!(req.chain, Chain::Ethereum); + assert_eq!(req.args.payload.to_bytes(), catchup_payload.into()); + } + other => panic!("expected catchup sign request third, got {other:?}"), + } + + let fourth = next_sign_message_within(&mut sign_rx, Duration::from_secs(20)).await?; + match fourth { + Sign::Request(req) => assert_eq!(req.id, requeued_sign_id), + other => panic!("expected deferred recovered requeue fourth, got {other:?}"), + } + + run_handle.abort(); + Ok(()) } #[test_log::test(tokio::test)] diff --git a/integration-tests/tests/cases/solana_stream.rs b/integration-tests/tests/cases/solana_stream.rs index 2b63dd70..5a0b9fcf 100644 --- a/integration-tests/tests/cases/solana_stream.rs +++ b/integration-tests/tests/cases/solana_stream.rs @@ -30,7 +30,7 @@ fn test_dependencies() -> (Backlog, watch::Receiver, NodeClient) { async fn stream_solana(config: SolConfig) -> Result { let mut stream = SolanaStream::new(Some(config)).context("failed to create SolanaStream")?; - ChainStream::start(&mut stream).await; + let _ = ChainStream::start(&mut stream).await?; Ok(stream) }