Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 49 additions & 22 deletions ant-core/src/data/client/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ impl Client {
)));
}

let timeout = Duration::from_secs(self.config().timeout_secs);
let per_peer_timeout = Duration::from_secs(self.config().timeout_secs);
// Overall timeout for collecting all quotes. Must accommodate
// connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈ 80s)
// plus the per-peer quote timeout. 120s is generous.
let overall_timeout = Duration::from_secs(120);

// Request quotes from all peers concurrently
let mut quote_futures = FuturesUnordered::new();
Expand Down Expand Up @@ -90,7 +94,7 @@ impl Client {
&peer_id_clone,
message_bytes,
request_id,
timeout,
per_peer_timeout,
&addrs_clone,
|body| match body {
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
Expand Down Expand Up @@ -132,33 +136,56 @@ impl Client {
quote_futures.push(quote_future);
}

// Wait for all quote requests to complete or timeout.
// Early-return if CLOSE_GROUP_MAJORITY peers report already stored.
// Wait for all quote requests to complete, with an overall timeout
// to prevent indefinite stalls when connection establishment hangs.
let mut quotes = Vec::with_capacity(CLOSE_GROUP_SIZE);
let mut already_stored_count = 0usize;
let mut failures: Vec<String> = Vec::new();

while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
match quote_result {
Ok((quote, price)) => {
quotes.push((peer_id, addrs, quote, price));
}
Err(Error::AlreadyStored) => {
already_stored_count += 1;
debug!("Peer {peer_id} reports chunk already stored");
if already_stored_count >= CLOSE_GROUP_MAJORITY {
info!(
"Chunk {} already stored ({already_stored_count}/{CLOSE_GROUP_SIZE} peers confirm)",
hex::encode(address)
);
return Err(Error::AlreadyStored);
let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
tokio::time::timeout(overall_timeout, async {
while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
match quote_result {
Ok((quote, price)) => {
quotes.push((peer_id, addrs, quote, price));
}
Err(Error::AlreadyStored) => {
already_stored_count += 1;
debug!("Peer {peer_id} reports chunk already stored");
if already_stored_count >= CLOSE_GROUP_MAJORITY {
info!(
"Chunk {} already stored ({already_stored_count}/{CLOSE_GROUP_SIZE} peers confirm)",
hex::encode(address)
);
return Err(Error::AlreadyStored);
}
}
Err(e) => {
warn!("Failed to get quote from {peer_id}: {e}");
failures.push(format!("{peer_id}: {e}"));
}
}
}
Err(e) => {
warn!("Failed to get quote from {peer_id}: {e}");
failures.push(format!("{peer_id}: {e}"));
}
Ok(())
})
.await;

match collect_result {
Err(_elapsed) => {
warn!(
"Quote collection timed out after {:?} for address {}",
overall_timeout,
hex::encode(address)
);
return Err(Error::Timeout(format!(
"Quote collection timed out after {:?}. Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]",
overall_timeout,
quotes.len(),
failures.join("; ")
)));
}
Ok(Err(e)) => return Err(e), // AlreadyStored early return
Ok(Ok(())) => {} // All futures completed normally
}

if quotes.len() >= CLOSE_GROUP_SIZE {
Expand Down
Loading