diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index 686531e..a9dbdc9 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -97,3 +97,7 @@ path = "tests/e2e_merkle.rs" [[test]] name = "e2e_huge_file" path = "tests/e2e_huge_file.rs" + +[[test]] +name = "e2e_upload_costs" +path = "tests/e2e_upload_costs.rs" diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index 2afdbbd..8a94ba6 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -17,7 +17,8 @@ use evmlib::wallet::PayForQuotesError; use evmlib::{EncodedPeerId, PaymentQuote, ProofOfPayment, RewardsAddress}; use futures::stream::{self, StreamExt}; use std::collections::{HashMap, HashSet}; -use tracing::{debug, info}; +use std::time::Duration; +use tracing::{debug, info, warn}; /// Number of chunks per payment wave. const PAYMENT_WAVE_SIZE: usize = 64; @@ -38,7 +39,7 @@ pub struct PreparedChunk { } /// Chunk paid but not yet stored. Produced by [`Client::batch_pay`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PaidChunk { /// The chunk content bytes. pub content: Bytes, @@ -50,6 +51,15 @@ pub struct PaidChunk { pub proof_bytes: Vec, } +/// Result of storing a wave of paid chunks, with retry tracking. +#[derive(Debug)] +pub struct WaveResult { + /// Successfully stored chunk addresses. + pub stored: Vec, + /// Chunks that failed to store after all retries. + pub failed: Vec<(XorName, String)>, +} + /// Payment data for external signing. /// /// Contains the information needed to construct and submit the on-chain @@ -311,9 +321,20 @@ impl Client { None => (self.prepare_wave(wave_chunks).await, None), }; - // Propagate store errors from previous wave. - if let Some(stored) = store_result { - all_addresses.extend(stored?); + // Track partial progress from previous wave. + if let Some(wave_result) = store_result { + all_addresses.extend(&wave_result.stored); + if !wave_result.failed.is_empty() { + let failed_count = wave_result.failed.len(); + warn!("{failed_count} chunks failed to store after retries"); + return Err(Error::PartialUpload { + stored: all_addresses.clone(), + stored_count: all_addresses.len(), + failed: wave_result.failed, + failed_count, + reason: "wave store failed after retries".into(), + }); + } } let (prepared_chunks, already_stored) = prepare_result?; @@ -334,7 +355,19 @@ impl Client { // Store the last wave. if let Some(paid_chunks) = pending_store { - all_addresses.extend(self.store_paid_chunks(paid_chunks).await?); + let wave_result = self.store_paid_chunks(paid_chunks).await; + all_addresses.extend(&wave_result.stored); + if !wave_result.failed.is_empty() { + let failed_count = wave_result.failed.len(); + warn!("{failed_count} chunks failed to store after retries (final wave)"); + return Err(Error::PartialUpload { + stored: all_addresses.clone(), + stored_count: all_addresses.len(), + failed: wave_result.failed, + failed_count, + reason: "final wave store failed after retries".into(), + }); + } } info!("Batch upload complete: {} addresses", all_addresses.len()); @@ -376,20 +409,81 @@ impl Client { } /// Store a batch of paid chunks concurrently to their close groups. - pub(crate) async fn store_paid_chunks( - &self, - paid_chunks: Vec, - ) -> Result> { - let results: Vec> = stream::iter(paid_chunks) - .map(|chunk| async move { - self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers) - .await - }) - .buffer_unordered(self.config().chunk_concurrency) - .collect() - .await; + /// + /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s). + /// Returns a [`WaveResult`] with both successes and failures so callers can + /// track partial progress instead of losing information about stored chunks. + pub(crate) async fn store_paid_chunks(&self, paid_chunks: Vec) -> WaveResult { + const MAX_RETRIES: u32 = 3; + const BASE_DELAY_MS: u64 = 500; + + let mut stored = Vec::new(); + let mut to_retry = paid_chunks; + + for attempt in 0..=MAX_RETRIES { + if attempt > 0 { + let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1)); + tokio::time::sleep(delay).await; + info!( + "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks", + to_retry.len() + ); + } - results.into_iter().collect() + let results: Vec<(PaidChunk, Result)> = stream::iter(to_retry) + .map(|chunk| { + let chunk_clone = chunk.clone(); + async move { + let result = self + .chunk_put_to_close_group( + chunk.content, + chunk.proof_bytes, + &chunk.quoted_peers, + ) + .await; + (chunk_clone, result) + } + }) + .buffer_unordered(self.config().chunk_concurrency) + .collect() + .await; + + let mut failed_this_round = Vec::new(); + for (chunk, result) in results { + match result { + Ok(name) => stored.push(name), + Err(e) => failed_this_round.push((chunk, e.to_string())), + } + } + + if failed_this_round.is_empty() { + return WaveResult { + stored, + failed: Vec::new(), + }; + } + + if attempt == MAX_RETRIES { + let failed = failed_this_round + .into_iter() + .map(|(c, e)| (c.address, e)) + .collect(); + return WaveResult { stored, failed }; + } + + warn!( + "{} chunks failed on attempt {}, will retry", + failed_this_round.len(), + attempt + 1 + ); + to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect(); + } + + // Unreachable due to loop structure, but satisfy the compiler. + WaveResult { + stored, + failed: Vec::new(), + } } } diff --git a/ant-core/src/data/client/data.rs b/ant-core/src/data/client/data.rs index f9a3cae..e54b06c 100644 --- a/ant-core/src/data/client/data.rs +++ b/ant-core/src/data/client/data.rs @@ -6,7 +6,7 @@ //! For file-based streaming uploads that avoid loading the entire //! file into memory, see the `file` module. -use crate::data::client::batch::PaymentIntent; +use crate::data::client::batch::{PaymentIntent, PreparedChunk}; use crate::data::client::file::PreparedUpload; use crate::data::client::merkle::PaymentMode; use crate::data::client::Client; @@ -177,9 +177,16 @@ impl Client { .map(|chunk| chunk.content) .collect(); - let mut prepared_chunks = Vec::with_capacity(chunk_contents.len()); - for content in chunk_contents { - if let Some(prepared) = self.prepare_chunk_payment(content).await? { + let concurrency = self.config().chunk_concurrency; + let results: Vec>> = futures::stream::iter(chunk_contents) + .map(|content| async move { self.prepare_chunk_payment(content).await }) + .buffer_unordered(concurrency) + .collect() + .await; + + let mut prepared_chunks = Vec::with_capacity(results.len()); + for result in results { + if let Some(prepared) = result? { prepared_chunks.push(prepared); } } @@ -196,6 +203,7 @@ impl Client { data_map, prepared_chunks, payment_intent, + payment_mode: PaymentMode::Single, }) } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index dfa77dd..909c4bc 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -19,6 +19,7 @@ use ant_node::client::compute_address; use bytes::Bytes; use evmlib::common::QuoteHash; use evmlib::common::TxHash; +use fs2::FileExt; use futures::stream::{self, StreamExt}; use self_encryption::{stream_encrypt, streaming_decrypt, DataMap}; use std::collections::{HashMap, HashSet}; @@ -44,9 +45,21 @@ const DISK_SPACE_HEADROOM_PERCENT: u64 = 10; /// During file encryption, chunks are written to a temp directory so that /// only their 32-byte addresses stay in memory. At upload time chunks are /// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`. +/// Maximum age (in seconds) for orphaned spill directories. +/// Dirs older than this are cleaned up if they have no active lockfile. +const SPILL_MAX_AGE_SECS: u64 = 24 * 60 * 60; // 24 hours + +/// Prefix for spill directory names to distinguish from user files. +const SPILL_DIR_PREFIX: &str = "spill_"; + +/// Lockfile name inside each spill dir to signal active use. +const SPILL_LOCK_NAME: &str = ".lock"; + struct ChunkSpill { - /// Temp directory holding spilled chunk files (named by hex address). + /// Directory holding spilled chunk files (named by hex address). dir: PathBuf, + /// Lockfile held for the lifetime of this spill (prevents stale cleanup). + _lock: std::fs::File, /// Deduplicated list of chunk addresses. addresses: Vec<[u8; 32]>, /// Tracks seen addresses for deduplication. @@ -56,22 +69,147 @@ struct ChunkSpill { } impl ChunkSpill { - /// Create a new spill directory under the system temp dir. + /// Return the parent directory for all spill dirs: `/spill/`. + fn spill_root() -> Result { + use crate::config; + let root = config::data_dir() + .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))? + .join("spill"); + Ok(root) + } + + /// Create a new spill directory under `/spill/`. /// - /// Uses `create_dir` (not `create_dir_all`) so creation fails if the - /// directory already exists, preventing silent reuse of a stale spill. + /// Directory name is `spill__` so orphans can be + /// identified by prefix and cleaned up by age. A lockfile inside the + /// dir prevents concurrent cleanup from deleting an active spill. fn new() -> Result { + let root = Self::spill_root()?; + std::fs::create_dir_all(&root)?; + + // Clean up stale spill dirs from previous crashed runs. + Self::cleanup_stale(&root); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); let unique: u64 = rand::random(); - let dir = std::env::temp_dir().join(format!(".ant_spill_{}_{unique}", std::process::id())); + let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}")); std::fs::create_dir(&dir)?; + + // Create and hold a lockfile for the lifetime of this spill. + // cleanup_stale() will skip dirs with locked files. + let lock_path = dir.join(SPILL_LOCK_NAME); + let lock_file = std::fs::File::create(&lock_path).map_err(|e| { + Error::Io(std::io::Error::new( + e.kind(), + format!("failed to create spill lockfile: {e}"), + )) + })?; + lock_file.try_lock_exclusive().map_err(|e| { + Error::Io(std::io::Error::new( + e.kind(), + format!("failed to lock spill lockfile: {e}"), + )) + })?; + Ok(Self { dir, + _lock: lock_file, addresses: Vec::new(), seen: HashSet::new(), total_bytes: 0, }) } + /// Clean up stale spill directories. Best-effort, errors are logged. + /// + /// Only removes directories that: + /// 1. Start with `SPILL_DIR_PREFIX` (ignores unrelated files) + /// 2. Are actual directories (not symlinks -- prevents symlink attacks) + /// 3. Have a timestamp older than `SPILL_MAX_AGE_SECS` + /// 4. Do NOT have an active lockfile (prevents deleting in-progress uploads) + /// + /// Safe to call concurrently from multiple processes. + fn cleanup_stale(root: &Path) { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + if now == 0 { + // Clock is broken (before Unix epoch). Skip cleanup to avoid + // misidentifying dirs as stale. + warn!("System clock before Unix epoch, skipping spill cleanup"); + return; + } + + let entries = match std::fs::read_dir(root) { + Ok(entries) => entries, + Err(_) => return, + }; + + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + + // Only process dirs with our prefix. + let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) { + Some(s) => s, + None => continue, + }; + + // Parse timestamp: "spill__" + let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) { + Some(ts) => ts, + None => continue, + }; + + if now.saturating_sub(timestamp) <= SPILL_MAX_AGE_SECS { + continue; + } + + // Safety: only delete actual directories, not symlinks. + let file_type = match entry.file_type() { + Ok(ft) => ft, + Err(_) => continue, + }; + if !file_type.is_dir() { + continue; + } + + let path = entry.path(); + + // Check lockfile: if locked, the dir is in active use -- skip it. + let lock_path = path.join(SPILL_LOCK_NAME); + if let Ok(lock_file) = std::fs::File::open(&lock_path) { + use fs2::FileExt; + if lock_file.try_lock_exclusive().is_err() { + // Lock held by another process -- dir is active. + debug!("Skipping active spill dir: {}", path.display()); + continue; + } + // We acquired the lock, so no one else holds it. + // Drop it before deleting. + drop(lock_file); + } + + info!("Cleaning up stale spill dir: {}", path.display()); + if let Err(e) = std::fs::remove_dir_all(&path) { + warn!("Failed to clean up stale spill dir {}: {e}", path.display()); + } + } + } + + /// Run stale spill cleanup. Call at client startup or periodically. + #[allow(dead_code)] + pub(crate) fn run_cleanup() { + if let Ok(root) = Self::spill_root() { + Self::cleanup_stale(&root); + } + } + /// Write one encrypted chunk to disk and record its address. /// /// Deduplicates by content address: if the same chunk was already @@ -151,17 +289,23 @@ impl Drop for ChunkSpill { } } -/// Check that the temp directory has enough free space for the spilled chunks. +/// Check that the spill directory has enough free space for the spilled chunks. /// /// `file_size` is the source file's byte count. We require -/// `file_size + 10%` free space in the temp dir to account for -/// self-encryption overhead. +/// `file_size + 10%` free space to account for self-encryption overhead. fn check_disk_space_for_spill(file_size: u64) -> Result<()> { - let tmp = std::env::temp_dir(); - let available = fs2::available_space(&tmp).map_err(|e| { + let spill_root = ChunkSpill::spill_root()?; + + // Ensure the root exists so fs2 can query it. + std::fs::create_dir_all(&spill_root)?; + + let available = fs2::available_space(&spill_root).map_err(|e| { Error::Io(std::io::Error::new( e.kind(), - format!("failed to query disk space on {}: {e}", tmp.display()), + format!( + "failed to query disk space on {}: {e}", + spill_root.display() + ), )) })?; @@ -173,14 +317,14 @@ fn check_disk_space_for_spill(file_size: u64) -> Result<()> { let avail_mb = available / (1024 * 1024); let req_mb = required / (1024 * 1024); return Err(Error::InsufficientDiskSpace(format!( - "need ~{req_mb} MB in temp dir ({}) but only {avail_mb} MB available", - tmp.display() + "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available", + spill_root.display() ))); } debug!( - "Disk space check passed: {available} bytes available, {required} bytes required (temp: {})", - tmp.display() + "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})", + spill_root.display() ); Ok(()) } @@ -213,6 +357,8 @@ pub struct PreparedUpload { pub prepared_chunks: Vec, /// Payment intent for external signing. pub payment_intent: PaymentIntent, + /// The payment mode used for this upload. + pub payment_mode: PaymentMode, } /// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle. @@ -352,22 +498,27 @@ impl Client { spill.len() ); - // Read each chunk from disk and collect quotes. Note: all PreparedChunks - // accumulate in memory because the external-signer protocol requires them - // for finalize_upload. This is NOT memory-bounded for large files. + // Read each chunk from disk and collect quotes concurrently. + // Note: all PreparedChunks accumulate in memory because the external-signer + // protocol requires them for finalize_upload. NOT memory-bounded for large files. + let chunk_data: Vec = spill + .addresses + .iter() + .map(|addr| spill.read_chunk(addr)) + .collect::, _>>()?; + + let concurrency = self.config().chunk_concurrency; + let results: Vec>> = stream::iter(chunk_data) + .map(|content| async move { self.prepare_chunk_payment(content).await }) + .buffer_unordered(concurrency) + .collect() + .await; + let mut prepared_chunks = Vec::with_capacity(spill.len()); - for (i, addr) in spill.addresses.iter().enumerate() { - let content = spill.read_chunk(addr)?; - if let Some(prepared) = self.prepare_chunk_payment(content).await? { + for result in results { + if let Some(prepared) = result? { prepared_chunks.push(prepared); } - if (i + 1) % 100 == 0 { - info!( - "Prepared {}/{} chunks for external signing", - i + 1, - spill.len() - ); - } } let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks); @@ -383,6 +534,7 @@ impl Client { data_map, prepared_chunks, payment_intent, + payment_mode: PaymentMode::Single, }) } @@ -401,14 +553,25 @@ impl Client { tx_hash_map: &HashMap, ) -> Result { let paid_chunks = finalize_batch_payment(prepared.prepared_chunks, tx_hash_map)?; - let chunks_stored = self.store_paid_chunks(paid_chunks).await?.len(); + let wave_result = self.store_paid_chunks(paid_chunks).await; + if !wave_result.failed.is_empty() { + let failed_count = wave_result.failed.len(); + return Err(Error::PartialUpload { + stored: wave_result.stored.clone(), + stored_count: wave_result.stored.len(), + failed: wave_result.failed, + failed_count, + reason: "finalize_upload: chunk storage failed after retries".into(), + }); + } + let chunks_stored = wave_result.stored.len(); info!("External-signer upload finalized: {chunks_stored} chunks stored"); Ok(FileUploadResult { data_map: prepared.data_map, chunks_stored, - payment_mode_used: PaymentMode::Single, + payment_mode_used: prepared.payment_mode, }) } diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index 95c912a..e67b000 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -179,7 +179,7 @@ impl Client { info!("Generating merkle proofs for {chunk_count} chunks"); let mut proofs = HashMap::with_capacity(chunk_count); - for (i, xorname) in xornames.iter().enumerate() { + for (i, (xorname, address)) in xornames.iter().zip(addresses.iter()).enumerate() { let address_proof = tree.generate_address_proof(i, *xorname).map_err(|e| { Error::Payment(format!( "Failed to generate address proof for chunk {i}: {e}" @@ -193,7 +193,7 @@ impl Client { Error::Serialization(format!("Failed to serialize merkle proof: {e}")) })?; - proofs.insert(addresses[i], tagged_bytes); + proofs.insert(*address, tagged_bytes); } info!("Merkle batch payment complete: {chunk_count} proofs generated"); @@ -211,13 +211,36 @@ impl Client { data_type: u32, data_size: u64, ) -> Result { + let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect(); + let total_sub_batches = sub_batches.len(); let mut all_proofs = HashMap::with_capacity(addresses.len()); - for chunk in addresses.chunks(MAX_LEAVES) { - let sub_result = self + for (i, chunk) in sub_batches.into_iter().enumerate() { + match self .pay_for_merkle_single_batch(chunk, data_type, data_size) - .await?; - all_proofs.extend(sub_result.proofs); + .await + { + Ok(sub_result) => { + all_proofs.extend(sub_result.proofs); + } + Err(e) => { + if all_proofs.is_empty() { + // First sub-batch failed, nothing paid yet -- propagate directly. + return Err(e); + } + // Return partial result so caller can still store already-paid chunks. + warn!( + "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \ + Returning {} proofs from prior sub-batches", + i + 1, + all_proofs.len() + ); + return Ok(MerkleBatchPaymentResult { + chunk_count: all_proofs.len(), + proofs: all_proofs, + }); + } + } } Ok(MerkleBatchPaymentResult { diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 92c8f02..045f586 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -17,11 +17,26 @@ use futures::stream::{FuturesUnordered, StreamExt}; use std::time::Duration; use tracing::{debug, info, warn}; +/// Compute XOR distance between a peer's ID bytes and a target address. +/// +/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed with +/// the target address. Returns a byte array suitable for lexicographic comparison. +fn xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] { + let peer_bytes = peer_id.as_bytes(); + let mut distance = [0u8; 32]; + for (i, d) in distance.iter_mut().enumerate() { + let pb = peer_bytes.get(i).copied().unwrap_or(0); + *d = pb ^ target[i]; + } + distance +} + impl Client { /// Get storage quotes from the closest peers for a given address. /// - /// Queries the DHT for the `CLOSE_GROUP_SIZE` closest peers and requests - /// quotes from each. Waits for all requests to complete or timeout. + /// Queries 2x `CLOSE_GROUP_SIZE` peers from the DHT for fault tolerance, + /// requests quotes from all of them concurrently, and returns the + /// `CLOSE_GROUP_SIZE` closest successful responders sorted by XOR distance. /// /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers /// report the chunk is already stored. @@ -38,14 +53,17 @@ impl Client { ) -> Result, PaymentQuote, Amount)>> { let node = self.network().node(); + // Over-query for fault tolerance: ask 2x peers, keep closest successful ones. + let over_query_count = CLOSE_GROUP_SIZE * 2; debug!( - "Requesting {CLOSE_GROUP_SIZE} quotes for address {} (size: {data_size})", + "Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})", hex::encode(address) ); - // Query DHT and take only the CLOSE_GROUP_SIZE closest peers. - let mut remote_peers = self.close_group_peers(address).await?; - remote_peers.truncate(CLOSE_GROUP_SIZE); + let remote_peers = self + .network() + .find_closest_peers(address, over_query_count) + .await?; if remote_peers.len() < CLOSE_GROUP_SIZE { return Err(Error::InsufficientPeers(format!( @@ -135,10 +153,10 @@ impl Client { quote_futures.push(quote_future); } - // Wait for all quote requests to complete, with an overall timeout - // to prevent indefinite stalls when connection establishment hangs. - let mut quotes = Vec::with_capacity(CLOSE_GROUP_SIZE); - let mut already_stored_count = 0usize; + // Collect all responses with an overall timeout to prevent indefinite stalls. + // Over-query means we have 2x peers, so we can tolerate failures. + let mut quotes = Vec::with_capacity(over_query_count); + let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new(); let mut failures: Vec = Vec::new(); let collect_result: std::result::Result, _> = @@ -149,15 +167,9 @@ impl Client { quotes.push((peer_id, addrs, quote, price)); } Err(Error::AlreadyStored) => { - already_stored_count += 1; debug!("Peer {peer_id} reports chunk already stored"); - if already_stored_count >= CLOSE_GROUP_MAJORITY { - info!( - "Chunk {} already stored ({already_stored_count}/{CLOSE_GROUP_SIZE} peers confirm)", - hex::encode(address) - ); - return Err(Error::AlreadyStored); - } + let dist = xor_distance(&peer_id, address); + already_stored_peers.push((peer_id, dist)); } Err(e) => { warn!("Failed to get quote from {peer_id}: {e}"); @@ -172,26 +184,58 @@ impl Client { match collect_result { Err(_elapsed) => { warn!( - "Quote collection timed out after {:?} for address {}", - overall_timeout, + "Quote collection timed out after {overall_timeout:?} for address {}", hex::encode(address) ); - return Err(Error::Timeout(format!( - "Quote collection timed out after {:?}. Got {} quotes, need {CLOSE_GROUP_SIZE}. Failures: [{}]", - overall_timeout, - quotes.len(), - failures.join("; ") - ))); + // Fall through to check if we have enough quotes despite timeout. + // The timeout fires when slow peers haven't responded yet, but we + // may already have enough successful quotes from fast peers. + } + Ok(Err(e)) => return Err(e), + Ok(Ok(())) => {} + } + + // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers. + if !already_stored_peers.is_empty() { + let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new(); + for (peer_id, _, _, _) in "es { + all_peers_by_distance.push((false, xor_distance(peer_id, address))); + } + for (_, dist) in &already_stored_peers { + all_peers_by_distance.push((true, *dist)); + } + all_peers_by_distance.sort_by(|a, b| a.1.cmp(&b.1)); + + let close_group_stored = all_peers_by_distance + .iter() + .take(CLOSE_GROUP_SIZE) + .filter(|(is_stored, _)| *is_stored) + .count(); + + if close_group_stored >= CLOSE_GROUP_MAJORITY { + info!( + "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)", + hex::encode(address) + ); + return Err(Error::AlreadyStored); } - Ok(Err(e)) => return Err(e), // AlreadyStored early return - Ok(Ok(())) => {} // All futures completed normally } if quotes.len() >= CLOSE_GROUP_SIZE { + let total_responses = quotes.len() + failures.len() + already_stored_peers.len(); + + // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE. + quotes.sort_by(|a, b| { + let dist_a = xor_distance(&a.0, address); + let dist_b = xor_distance(&b.0, address); + dist_a.cmp(&dist_b) + }); + quotes.truncate(CLOSE_GROUP_SIZE); + info!( - "Collected {} quotes for address {}", + "Collected {} quotes for address {} (from {total_responses} responses)", quotes.len(), - hex::encode(address) + hex::encode(address), ); return Ok(quotes); } diff --git a/ant-core/src/data/error.rs b/ant-core/src/data/error.rs index 7d5d542..e571fa5 100644 --- a/ant-core/src/data/error.rs +++ b/ant-core/src/data/error.rs @@ -67,6 +67,23 @@ pub enum Error { /// Not enough disk space for the operation. #[error("insufficient disk space: {0}")] InsufficientDiskSpace(String), + + /// Upload partially succeeded -- some chunks stored, some failed after retries. + /// + /// The `stored` addresses can be used for progress tracking and resume. + #[error("partial upload: {stored_count} stored, {failed_count} failed: {reason}")] + PartialUpload { + /// Addresses of successfully stored chunks. + stored: Vec<[u8; 32]>, + /// Number of successfully stored chunks. + stored_count: usize, + /// Addresses and error messages of chunks that failed after retries. + failed: Vec<([u8; 32], String)>, + /// Number of failed chunks. + failed_count: usize, + /// Root cause description. + reason: String, + }, } impl From for Error { diff --git a/ant-core/tests/e2e_huge_file.rs b/ant-core/tests/e2e_huge_file.rs index 6503707..cd7ccba 100644 --- a/ant-core/tests/e2e_huge_file.rs +++ b/ant-core/tests/e2e_huge_file.rs @@ -448,6 +448,96 @@ async fn test_huge_file_upload_download_4gb() { testnet.teardown().await; } +/// Upload and download an 8 GB file, proving memory stays bounded. +/// +/// 8 GB → ~2048 chunks → ~32 waves of 64 chunks each. +/// Memory usage should be comparable to the 1 GB and 4 GB tests. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_huge_file_upload_download_8gb() { + let file_size: u64 = 8 * 1024 * 1024 * 1024; + + let (client, testnet) = setup_large().await; + let work_dir = TempDir::new().expect("create work dir"); + + eprintln!("Creating 8 GB test file..."); + let input_path = create_test_file(&work_dir, file_size); + eprintln!("Test file created at {}", input_path.display()); + + tokio::task::yield_now().await; + + let rss_before = get_current_rss_bytes(); + if let Some(rss) = rss_before { + eprintln!("RSS baseline before upload: {} MB", rss / (1024 * 1024)); + } + + eprintln!("Uploading 8 GB file..."); + let result = client + .file_upload(input_path.as_path()) + .await + .expect("8 GB file upload should succeed"); + + let rss_after_upload = get_current_rss_bytes(); + + eprintln!( + "Upload complete: {} chunks stored, mode: {:?}", + result.chunks_stored, result.payment_mode_used + ); + + // 8 GB at MAX_CHUNK_SIZE (4,190,208 bytes) → ~2053 chunks minimum + assert!( + result.chunks_stored >= 2000, + "8 GB file should produce at least 2000 chunks, got {}", + result.chunks_stored + ); + + if let (Some(before), Some(after)) = (rss_before, rss_after_upload) { + let increase = after.saturating_sub(before); + let increase_mb = increase / (1024 * 1024); + let max_mb = MAX_RSS_INCREASE_BYTES / (1024 * 1024); + eprintln!( + "Current RSS before: {} MB, after: {} MB, increase: {increase_mb} MB (limit: {max_mb} MB)", + before / (1024 * 1024), + after / (1024 * 1024) + ); + assert!( + increase < MAX_RSS_INCREASE_BYTES, + "RSS increased by {increase_mb} MB for an 8 GB file — \ + this exceeds the {max_mb} MB limit. \ + Before: {} MB, After: {} MB", + before / (1024 * 1024), + after / (1024 * 1024) + ); + } else { + #[cfg(any(target_os = "macos", target_os = "linux"))] + panic!("RSS measurement failed on a supported platform — cannot verify bounded memory"); + + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + eprintln!("WARNING: Could not measure current RSS on this platform, skipping memory check"); + } + + // Download and verify + let output_path = work_dir.path().join("downloaded_8gb.bin"); + eprintln!("Downloading 8 GB file..."); + let bytes_written = client + .file_download(&result.data_map, &output_path) + .await + .expect("8 GB file download should succeed"); + + assert_eq!( + bytes_written, file_size, + "bytes_written should equal original file size" + ); + eprintln!("Download complete: {bytes_written} bytes written"); + + eprintln!("Verifying file content..."); + verify_file_content(&output_path, file_size); + eprintln!("Content verification passed — all 8 GB match"); + + drop(client); + testnet.teardown().await; +} + /// Test that a moderately large file (64 MB) round-trips correctly. /// /// This test verifies data integrity on a medium file. Memory bounding diff --git a/ant-core/tests/e2e_upload_costs.rs b/ant-core/tests/e2e_upload_costs.rs new file mode 100644 index 0000000..1c202b1 --- /dev/null +++ b/ant-core/tests/e2e_upload_costs.rs @@ -0,0 +1,289 @@ +//! Upload cost comparison: Merkle vs Single payment modes. +//! +//! Uploads files of various sizes using both payment modes and reports +//! the ANT token cost, gas cost, and chunk counts in a summary table. +//! +//! Run with: cargo test --release --test e2e_upload_costs -- --nocapture + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +mod support; + +use ant_core::data::client::merkle::PaymentMode; +use ant_core::data::{Client, ClientConfig}; +use serial_test::serial; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use support::MiniTestnet; +use tempfile::TempDir; + +/// Simple xorshift64 PRNG for deterministic, incompressible test data. +struct Xorshift64(u64); + +impl Xorshift64 { + fn new(seed: u64) -> Self { + Self(seed) + } + + fn next_u8(&mut self) -> u8 { + self.0 ^= self.0 << 13; + self.0 ^= self.0 >> 7; + self.0 ^= self.0 << 17; + (self.0 & 0xFF) as u8 + } +} + +fn create_test_file(dir: &Path, size: u64, name: &str, seed: u64) -> PathBuf { + let path = dir.join(name); + let mut file = std::fs::File::create(&path).expect("create test file"); + + let mut rng = Xorshift64::new(seed); + let mut remaining = size; + let write_buf_size: usize = 1024 * 1024; + let mut buf = vec![0u8; write_buf_size]; + while remaining > 0 { + let to_write = remaining.min(write_buf_size as u64) as usize; + for byte in buf.iter_mut().take(to_write) { + *byte = rng.next_u8(); + } + file.write_all(&buf[..to_write]) + .expect("write chunk to test file"); + remaining -= to_write as u64; + } + file.flush().expect("flush test file"); + + path +} + +/// Result of a single upload cost measurement. +struct CostResult { + file_size_mb: u64, + mode: &'static str, + chunks: usize, + ant_cost_atto: u128, + gas_cost_wei: u128, + num_evm_txs: &'static str, +} + +/// Measure ANT and gas cost for a single upload. +async fn measure_upload_cost( + client: &Client, + wallet: &evmlib::wallet::Wallet, + path: &Path, + mode: PaymentMode, + file_size_mb: u64, +) -> CostResult { + let ant_before = wallet + .balance_of_tokens() + .await + .expect("get ANT balance before"); + let gas_before = wallet + .balance_of_gas_tokens() + .await + .expect("get gas balance before"); + + let result = client + .file_upload_with_mode(path, mode) + .await + .expect("upload should succeed"); + + let ant_after = wallet + .balance_of_tokens() + .await + .expect("get ANT balance after"); + let gas_after = wallet + .balance_of_gas_tokens() + .await + .expect("get gas balance after"); + + let ant_spent = ant_before.saturating_sub(ant_after); + let gas_spent = gas_before.saturating_sub(gas_after); + + // Estimate tx count based on mode and chunk count + let tx_estimate = match mode { + PaymentMode::Single => format!("~{}", result.chunks_stored.div_ceil(256)), + PaymentMode::Merkle => { + let sub_batches = result.chunks_stored.div_ceil(256); + format!("{sub_batches}") + } + PaymentMode::Auto => format!("{:?}", result.payment_mode_used), + }; + + CostResult { + file_size_mb, + mode: match mode { + PaymentMode::Single => "Single", + PaymentMode::Merkle => "Merkle", + PaymentMode::Auto => "Auto", + }, + chunks: result.chunks_stored, + ant_cost_atto: ant_spent.to::(), + gas_cost_wei: gas_spent.to::(), + num_evm_txs: Box::leak(tx_estimate.into_boxed_str()), + } +} + +fn format_atto(atto: u128) -> String { + if atto == 0 { + return "0".to_string(); + } + // 1 ANT = 10^18 atto + let whole = atto / 1_000_000_000_000_000_000; + let frac = atto % 1_000_000_000_000_000_000; + if whole > 0 { + format!("{whole}.{frac:018} ANT") + } else { + format!("{atto} atto") + } +} + +fn format_wei(wei: u128) -> String { + if wei == 0 { + return "0".to_string(); + } + let gwei = wei / 1_000_000_000; + if gwei > 0 { + format!("{gwei} gwei") + } else { + format!("{wei} wei") + } +} + +fn print_table(results: &[CostResult]) { + eprintln!(); + eprintln!("╔══════════╤══════════╤════════╤══════════════════════════╤══════════════════╤═══════════╗"); + eprintln!("║ Size │ Mode │ Chunks │ ANT Cost │ Gas Cost │ EVM Txs ║"); + eprintln!("╠══════════╪══════════╪════════╪══════════════════════════╪══════════════════╪═══════════╣"); + for r in results { + let size = if r.file_size_mb >= 1024 { + format!("{} GB", r.file_size_mb / 1024) + } else { + format!("{} MB", r.file_size_mb) + }; + eprintln!( + "║ {:<8} │ {:<8} │ {:>6} │ {:<24} │ {:<16} │ {:<9} ║", + size, + r.mode, + r.chunks, + format_atto(r.ant_cost_atto), + format_wei(r.gas_cost_wei), + r.num_evm_txs, + ); + } + eprintln!("╚══════════╧══════════╧════════╧══════════════════════════╧══════════════════╧═══════════╝"); + eprintln!(); +} + +/// Upload cost comparison table. +/// +/// Uses smaller file sizes (10-500MB) for fast execution while still +/// covering single-wave, multi-wave, and multi-batch-merkle scenarios. +/// +/// Run with `--release` for 3-5x speedup: +/// cargo test --release --test e2e_upload_costs -- --nocapture +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_upload_cost_comparison() { + // Need 20+ nodes for merkle payment pools (CANDIDATES_PER_POOL = 16) + let testnet = MiniTestnet::start(20).await; + let node = testnet.node(3).expect("Node 3 should exist"); + let client = Client::from_node(Arc::clone(&node), ClientConfig::default()) + .with_wallet(testnet.wallet().clone()); + + let work_dir = TempDir::new().expect("create work dir"); + + // Sizes covering single-wave through multi-batch merkle scenarios. + // 10 MB -> ~3 chunks (tiny, below merkle threshold) + // 50 MB -> ~13 chunks (small, below 64-chunk wave) + // 200 MB -> ~54 chunks (near wave boundary) + // 500 MB -> ~128 chunks (multi-wave, near merkle 256-leaf limit) + // 2 GB -> ~512 chunks (multi-batch merkle: 2 sub-batches) + // 4 GB -> ~1024 chunks (multi-batch merkle: 4 sub-batches) + let sizes: Vec<(u64, &str)> = vec![ + (10, "10mb.bin"), + (50, "50mb.bin"), + (200, "200mb.bin"), + (500, "500mb.bin"), + (2048, "2gb.bin"), + (4096, "4gb.bin"), + ]; + + let mut results = Vec::new(); + + for (size_mb, filename) in &sizes { + let file_size = *size_mb * 1024 * 1024; + let wallet = testnet.wallet(); + + // Create separate files for single and merkle to avoid AlreadyStored + let single_name = format!("single_{filename}"); + let merkle_name = format!("merkle_{filename}"); + + // Single payment mode + eprintln!("--- {size_mb} MB ---"); + eprintln!(" Creating file for Single mode..."); + let single_path = create_test_file( + work_dir.path(), + file_size, + &single_name, + 0xDEAD_BEEF_0000_0000 + *size_mb, + ); + eprintln!(" Uploading (Single)..."); + let single_result = + measure_upload_cost(&client, wallet, &single_path, PaymentMode::Single, *size_mb).await; + eprintln!( + " {} chunks, ANT: {}, Gas: {}", + single_result.chunks, + format_atto(single_result.ant_cost_atto), + format_wei(single_result.gas_cost_wei) + ); + results.push(single_result); + + // Merkle payment mode (skip if < 2 chunks -- merkle requires minimum 2) + eprintln!(" Creating file for Merkle mode..."); + let merkle_path = create_test_file( + work_dir.path(), + file_size, + &merkle_name, + 0xCAFE_BABE_0000_0000 + *size_mb, + ); + eprintln!(" Uploading (Merkle)..."); + let merkle_result = + measure_upload_cost(&client, wallet, &merkle_path, PaymentMode::Merkle, *size_mb).await; + eprintln!( + " {} chunks, ANT: {}, Gas: {}", + merkle_result.chunks, + format_atto(merkle_result.ant_cost_atto), + format_wei(merkle_result.gas_cost_wei) + ); + results.push(merkle_result); + + eprintln!(); + } + + // Print the summary table + eprintln!("=== UPLOAD COST COMPARISON TABLE ==="); + print_table(&results); + + // Verify merkle is cheaper in gas for files with enough chunks + for chunk in results.chunks(2) { + if let [single, merkle] = chunk { + if single.chunks >= 10 { + eprintln!( + "Gas savings for {} MB: Single={}, Merkle={} ({}x reduction)", + single.file_size_mb, + format_wei(single.gas_cost_wei), + format_wei(merkle.gas_cost_wei), + if merkle.gas_cost_wei > 0 { + single.gas_cost_wei / merkle.gas_cost_wei + } else { + 0 + } + ); + } + } + } + + drop(client); + testnet.teardown().await; +}