From af9d8a2e07ef70c7b58f049061ec1b0a7522482c Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Wed, 1 Apr 2026 21:31:06 +0100 Subject: [PATCH] feat: add overall timeout for quote collection (120s) Individual quote requests have per-peer timeouts, but the connection establishment (connect_with_fallback) can take up to 80s per peer and isn't covered by the per-peer timeout. Without an overall timeout, a single unreachable peer causes the entire upload to stall indefinitely. Wraps the quote collection loop in a 120s tokio::time::timeout. If the timeout fires, the upload fails fast with a clear error message showing how many quotes were collected and which peers failed. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 6 --- ant-core/src/data/client/quote.rs | 71 +++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3460d52..6960265 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -900,8 +900,6 @@ dependencies = [ [[package]] name = "ant-node" version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bc2ac7c262accf6b4ab73448816662c921303c4a81932379b90edfdee3e4a5d" dependencies = [ "aes-gcm-siv", "blake3", @@ -4983,8 +4981,6 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19de0950a0c9c9a3f97681c539e513c52c909866caddef5d825fe2dcbd1b0173" dependencies = [ "anyhow", "async-trait", @@ -5097,8 +5093,6 @@ dependencies = [ [[package]] name = "saorsa-transport" version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbe38ad2be12e4be34e203e79a33eab1664c23a4b65a1ff4b08973ae9af01f01" dependencies = [ "anyhow", "async-trait", diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 680c247..cd9a552 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -55,7 +55,11 @@ impl Client { ))); } - let timeout = Duration::from_secs(self.config().timeout_secs); + let per_peer_timeout = Duration::from_secs(self.config().timeout_secs); + // Overall timeout for collecting all quotes. Must accommodate + // connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈ 80s) + // plus the per-peer quote timeout. 120s is generous. + let overall_timeout = Duration::from_secs(120); // Request quotes from all peers concurrently let mut quote_futures = FuturesUnordered::new(); @@ -90,7 +94,7 @@ impl Client { &peer_id_clone, message_bytes, request_id, - timeout, + per_peer_timeout, &addrs_clone, |body| match body { ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success { @@ -132,33 +136,56 @@ impl Client { quote_futures.push(quote_future); } - // Wait for all quote requests to complete or timeout. - // Early-return if CLOSE_GROUP_MAJORITY peers report already stored. + // Wait for all quote requests to complete, with an overall timeout + // to prevent indefinite stalls when connection establishment hangs. let mut quotes = Vec::with_capacity(CLOSE_GROUP_SIZE); let mut already_stored_count = 0usize; let mut failures: Vec = Vec::new(); - while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { - match quote_result { - Ok((quote, price)) => { - quotes.push((peer_id, addrs, quote, price)); - } - Err(Error::AlreadyStored) => { - already_stored_count += 1; - debug!("Peer {peer_id} reports chunk already stored"); - if already_stored_count >= CLOSE_GROUP_MAJORITY { - info!( - "Chunk {} already stored ({already_stored_count}/{CLOSE_GROUP_SIZE} peers confirm)", - hex::encode(address) - ); - return Err(Error::AlreadyStored); + let collect_result: std::result::Result, _> = + tokio::time::timeout(overall_timeout, async { + while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await { + match quote_result { + Ok((quote, price)) => { + quotes.push((peer_id, addrs, quote, price)); + } + Err(Error::AlreadyStored) => { + already_stored_count += 1; + debug!("Peer {peer_id} reports chunk already stored"); + if already_stored_count >= CLOSE_GROUP_MAJORITY { + info!( + "Chunk {} already stored ({already_stored_count}/{CLOSE_GROUP_SIZE} peers confirm)", + hex::encode(address) + ); + return Err(Error::AlreadyStored); + } + } + Err(e) => { + warn!("Failed to get quote from {peer_id}: {e}"); + failures.push(format!("{peer_id}: {e}")); + } } } - Err(e) => { - warn!("Failed to get quote from {peer_id}: {e}"); - failures.push(format!("{peer_id}: {e}")); - } + Ok(()) + }) + .await; + + match collect_result { + Err(_elapsed) => { + warn!( + "Quote collection timed out after {:?} for address {}", + overall_timeout, + hex::encode(address) + ); + return Err(Error::Timeout(format!( + "Quote collection timed out after {:?}. Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", + overall_timeout, + quotes.len(), + failures.join("; ") + ))); } + Ok(Err(e)) => return Err(e), // AlreadyStored early return + Ok(Ok(())) => {} // All futures completed normally } if quotes.len() >= CLOSE_GROUP_SIZE {