Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ant-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ path = "tests/e2e_merkle.rs"
[[test]]
name = "e2e_huge_file"
path = "tests/e2e_huge_file.rs"

[[test]]
name = "e2e_upload_costs"
path = "tests/e2e_upload_costs.rs"
134 changes: 115 additions & 19 deletions ant-core/src/data/client/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use evmlib::wallet::PayForQuotesError;
use evmlib::{EncodedPeerId, PaymentQuote, ProofOfPayment, RewardsAddress};
use futures::stream::{self, StreamExt};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info};
use std::time::Duration;
use tracing::{debug, info, warn};

/// Number of chunks per payment wave.
const PAYMENT_WAVE_SIZE: usize = 64;
Expand All @@ -39,7 +40,7 @@ pub struct PreparedChunk {
}

/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PaidChunk {
/// The chunk content bytes.
pub content: Bytes,
Expand All @@ -51,6 +52,15 @@ pub struct PaidChunk {
pub proof_bytes: Vec<u8>,
}

/// Result of storing a wave of paid chunks, with retry tracking.
#[derive(Debug)]
pub struct WaveResult {
/// Successfully stored chunk addresses.
pub stored: Vec<XorName>,
/// Chunks that failed to store after all retries.
pub failed: Vec<(XorName, String)>,
}

/// Payment data for external signing.
///
/// Contains the information needed to construct and submit the on-chain
Expand Down Expand Up @@ -335,9 +345,20 @@ impl Client {
None => (self.prepare_wave(wave_chunks).await, None),
};

// Propagate store errors from previous wave.
if let Some(stored) = store_result {
all_addresses.extend(stored?);
// Track partial progress from previous wave.
if let Some(wave_result) = store_result {
all_addresses.extend(&wave_result.stored);
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
warn!("{failed_count} chunks failed to store after retries");
return Err(Error::PartialUpload {
stored: all_addresses.clone(),
stored_count: all_addresses.len(),
failed: wave_result.failed,
failed_count,
reason: "wave store failed after retries".into(),
});
}
}

let (prepared_chunks, already_stored) = prepare_result?;
Expand All @@ -358,7 +379,19 @@ impl Client {

// Store the last wave.
if let Some(paid_chunks) = pending_store {
all_addresses.extend(self.store_paid_chunks(paid_chunks).await?);
let wave_result = self.store_paid_chunks(paid_chunks).await;
all_addresses.extend(&wave_result.stored);
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
warn!("{failed_count} chunks failed to store after retries (final wave)");
return Err(Error::PartialUpload {
stored: all_addresses.clone(),
stored_count: all_addresses.len(),
failed: wave_result.failed,
failed_count,
reason: "final wave store failed after retries".into(),
});
}
}

info!("Batch upload complete: {} addresses", all_addresses.len());
Expand Down Expand Up @@ -400,20 +433,83 @@ impl Client {
}

/// Store a batch of paid chunks concurrently to their close groups.
pub(crate) async fn store_paid_chunks(
&self,
paid_chunks: Vec<PaidChunk>,
) -> Result<Vec<XorName>> {
let results: Vec<Result<XorName>> = stream::iter(paid_chunks)
.map(|chunk| async move {
self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
.await
})
.buffer_unordered(self.config().chunk_concurrency)
.collect()
.await;
///
/// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
/// Returns a [`WaveResult`] with both successes and failures so callers can
/// track partial progress instead of losing information about stored chunks.
pub(crate) async fn store_paid_chunks(&self, paid_chunks: Vec<PaidChunk>) -> WaveResult {
const MAX_RETRIES: u32 = 3;
const BASE_DELAY_MS: u64 = 500;

let mut stored = Vec::new();
let mut to_retry = paid_chunks;

for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
tokio::time::sleep(delay).await;
info!(
"Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
to_retry.len()
);
}

results.into_iter().collect()
let results: Vec<(PaidChunk, Result<XorName>)> = stream::iter(to_retry)
.map(|chunk| {
// Bytes::clone is O(1) (ref-counted). Only proof_bytes (~30KB)
// and quoted_peers are actually copied here.
let chunk_clone = chunk.clone();
async move {
let result = self
.chunk_put_to_close_group(
chunk.content,
chunk.proof_bytes,
&chunk.quoted_peers,
)
.await;
(chunk_clone, result)
}
})
.buffer_unordered(self.config().chunk_concurrency)
.collect()
.await;

let mut failed_this_round = Vec::new();
for (chunk, result) in results {
match result {
Ok(name) => stored.push(name),
Err(e) => failed_this_round.push((chunk, e.to_string())),
}
}

if failed_this_round.is_empty() {
return WaveResult {
stored,
failed: Vec::new(),
};
}

if attempt == MAX_RETRIES {
let failed = failed_this_round
.into_iter()
.map(|(c, e)| (c.address, e))
.collect();
return WaveResult { stored, failed };
}

warn!(
"{} chunks failed on attempt {}, will retry",
failed_this_round.len(),
attempt + 1
);
to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
}

// Unreachable due to loop structure, but satisfy the compiler.
WaveResult {
stored,
failed: Vec::new(),
}
}
}

Expand Down
16 changes: 12 additions & 4 deletions ant-core/src/data/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! For file-based streaming uploads that avoid loading the entire
//! file into memory, see the `file` module.

use crate::data::client::batch::PaymentIntent;
use crate::data::client::batch::{PaymentIntent, PreparedChunk};
use crate::data::client::file::PreparedUpload;
use crate::data::client::merkle::PaymentMode;
use crate::data::client::Client;
Expand Down Expand Up @@ -177,9 +177,16 @@ impl Client {
.map(|chunk| chunk.content)
.collect();

let mut prepared_chunks = Vec::with_capacity(chunk_contents.len());
for content in chunk_contents {
if let Some(prepared) = self.prepare_chunk_payment(content).await? {
let concurrency = self.config().chunk_concurrency;
let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
.map(|content| async move { self.prepare_chunk_payment(content).await })
.buffer_unordered(concurrency)
.collect()
.await;

let mut prepared_chunks = Vec::with_capacity(results.len());
for result in results {
if let Some(prepared) = result? {
prepared_chunks.push(prepared);
}
}
Expand All @@ -196,6 +203,7 @@ impl Client {
data_map,
prepared_chunks,
payment_intent,
payment_mode: PaymentMode::Single,
})
}

Expand Down
Loading
Loading