From 2486fa854e13f47ce0f438f3d1d653cd14e631ba Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 11 Mar 2026 13:29:46 +0900 Subject: [PATCH 1/6] feat: integrate self-encryption for streaming file encrypt/decrypt Replace plaintext file chunking with convergent encryption via the self_encryption crate (quantum-safe: ChaCha20-Poly1305 + BLAKE3). - Add src/client/self_encrypt.rs with streaming encrypt/decrypt, DataMap serialization, and public/private data mode support - Update CLI with --public upload flag and --datamap download option - Set plaintext chunk size to 4MB-4KB via .cargo/config.toml to leave headroom for AEAD tag and compression overhead - Point self_encryption dep at grumbach/post_quatum branch --- .cargo/config.toml | 8 + Cargo.toml | 3 + src/bin/saorsa-cli/cli.rs | 72 ++++- src/bin/saorsa-cli/main.rs | 228 +++++++------ src/client/mod.rs | 1 + src/client/self_encrypt.rs | 643 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 4 + src/storage/handler.rs | 9 +- 8 files changed, 847 insertions(+), 121 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 src/client/self_encrypt.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..426d66fe --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,8 @@ +# Set the plaintext chunk size for self_encryption to leave headroom +# for encryption overhead (ChaCha20-Poly1305 AEAD tag: 16 bytes, +# Brotli framing: ~50 bytes). This ensures encrypted chunks always +# fit within saorsa-node's MAX_CHUNK_SIZE (4 MiB = 4,194,304 bytes). +# +# Value: 4 MiB - 4 KiB = 4,190,208 bytes (4 KiB headroom). +[env] +MAX_CHUNK_SIZE = "4190208" diff --git a/Cargo.toml b/Cargo.toml index 87d84089..7d259a02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,9 @@ heed = "0.22" aes-gcm-siv = "0.11" hkdf = "0.12" +# Self-encryption (convergent encryption + streaming) +self_encryption = { git = "https://github.com/grumbach/self_encryption.git", branch = "post_quatum" } + # Hashing (aligned with saorsa-core) blake3 = "1" diff --git a/src/bin/saorsa-cli/cli.rs b/src/bin/saorsa-cli/cli.rs index 18cd926c..02ba4b55 100644 --- a/src/bin/saorsa-cli/cli.rs +++ b/src/bin/saorsa-cli/cli.rs @@ -74,11 +74,18 @@ pub enum FileAction { Upload { /// Path to the file to upload. path: PathBuf, + /// Public mode: store the data map on the network (anyone with the + /// address can download). Default is private (data map saved locally). + #[arg(long)] + public: bool, }, /// Download a file from the network. Download { - /// Hex-encoded manifest address (returned by upload). - address: String, + /// Hex-encoded address (public data map address or manifest address). + address: Option, + /// Path to a local data map file (for private downloads). + #[arg(long)] + datamap: Option, /// Output file path (defaults to stdout). #[arg(long, short)] output: Option, @@ -128,6 +135,67 @@ mod tests { assert!(cli.devnet_manifest.is_some()); } + #[test] + fn test_parse_upload_public() { + let cli = Cli::try_parse_from([ + "saorsa-cli", + "--bootstrap", + "127.0.0.1:10000", + "file", + "upload", + "--public", + "/tmp/test.txt", + ]) + .unwrap(); + + if let CliCommand::File { + action: FileAction::Upload { public, .. }, + } = cli.command + { + assert!(public, "Upload should be public"); + } else { + panic!("Expected File Upload"); + } + } + + #[test] + fn test_parse_download_with_datamap() { + let cli = Cli::try_parse_from([ + "saorsa-cli", + "--bootstrap", + "127.0.0.1:10000", + "file", + "download", + "--datamap", + "/tmp/my.datamap", + "--output", + "/tmp/out.bin", + ]) + .unwrap(); + + if let CliCommand::File { + action: + FileAction::Download { + address, + datamap, + output, + }, + } = cli.command + { + assert!( + address.is_none(), + "Address should be None for datamap download" + ); + assert_eq!( + datamap.as_deref(), + Some(std::path::Path::new("/tmp/my.datamap")) + ); + assert!(output.is_some()); + } else { + panic!("Expected File Download"); + } + } + #[test] fn test_secret_key_from_env() { // SECRET_KEY is read at runtime, not parsed by clap diff --git a/src/bin/saorsa-cli/main.rs b/src/bin/saorsa-cli/main.rs index 9d8d778f..bba69b9c 100644 --- a/src/bin/saorsa-cli/main.rs +++ b/src/bin/saorsa-cli/main.rs @@ -8,14 +8,15 @@ use cli::{ChunkAction, Cli, CliCommand, FileAction}; use evmlib::wallet::Wallet; use evmlib::Network as EvmNetwork; use saorsa_core::P2PNode; -use saorsa_node::ant_protocol::MAX_WIRE_MESSAGE_SIZE; -use saorsa_node::client::{ - create_manifest, deserialize_manifest, reassemble_file, serialize_manifest, split_file, - QuantumClient, QuantumConfig, XorName, +use saorsa_node::ant_protocol::{MAX_CHUNK_SIZE, MAX_WIRE_MESSAGE_SIZE}; +use saorsa_node::client::self_encrypt::{ + deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, + fetch_data_map_public, serialize_data_map, store_data_map_public, }; +use saorsa_node::client::{QuantumClient, QuantumConfig, XorName}; use saorsa_node::devnet::DevnetManifest; use saorsa_node::error::Error; -use std::io::Read as _; +use std::io::{Read as _, Write as _}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tracing::info; @@ -69,21 +70,33 @@ async fn main() -> color_eyre::Result<()> { }) .with_node(node); - if let Some(ref key) = private_key { - let network = resolve_evm_network(&cli.evm_network, manifest.as_ref())?; - let wallet = Wallet::new_from_private_key(network, key) - .map_err(|e| color_eyre::eyre::eyre!("Failed to create wallet: {e}"))?; - info!("Wallet configured for EVM payments"); - client = client.with_wallet(wallet); + if needs_wallet { + if let Some(ref key) = private_key { + let network = resolve_evm_network(&cli.evm_network, manifest.as_ref())?; + let wallet = Wallet::new_from_private_key(network, key) + .map_err(|e| color_eyre::eyre::eyre!("Failed to create wallet: {e}"))?; + info!("Wallet configured for EVM payments"); + client = client.with_wallet(wallet); + } } match cli.command { CliCommand::File { action } => match action { - FileAction::Upload { path } => { - handle_upload(&client, &path).await?; + FileAction::Upload { path, public } => { + handle_upload(&client, &path, public).await?; } - FileAction::Download { address, output } => { - handle_download(&client, &address, output.as_deref()).await?; + FileAction::Download { + address, + datamap, + output, + } => { + handle_download( + &client, + address.as_deref(), + datamap.as_deref(), + output.as_deref(), + ) + .await?; } }, CliCommand::Chunk { action } => match action { @@ -99,126 +112,99 @@ async fn main() -> color_eyre::Result<()> { Ok(()) } -async fn handle_upload(client: &QuantumClient, path: &Path) -> color_eyre::Result<()> { - let filename = path.file_name().and_then(|n| n.to_str()).map(String::from); - let file_content = std::fs::read(path)?; - let file_size = file_content.len(); - +async fn handle_upload( + client: &QuantumClient, + path: &Path, + public: bool, +) -> color_eyre::Result<()> { + let file_size = std::fs::metadata(path)?.len(); info!("Uploading file: {} ({file_size} bytes)", path.display()); - // Split file into chunks - let chunks = split_file(&file_content); - let chunk_count = chunks.len(); - info!("File split into {chunk_count} chunk(s)"); + // Encrypt and upload all chunks using streaming self-encryption + let (data_map, all_tx_hashes) = encrypt_and_upload_file(path, client).await?; + let chunk_count = data_map.chunk_identifiers.len(); + let total_tx_count = all_tx_hashes.len(); - // Upload each chunk with payment, collecting tx hashes - let mut chunk_addresses: Vec<[u8; 32]> = Vec::with_capacity(chunk_count); - let mut all_tx_hashes: Vec = Vec::new(); + if public { + // Public mode: store the DataMap on the network too + let (dm_address, dm_tx_hashes) = store_data_map_public(&data_map, client).await?; + let address_hex = hex::encode(dm_address); + let combined_tx = total_tx_count + dm_tx_hashes.len(); - for (i, chunk) in chunks.into_iter().enumerate() { - let chunk_num = i + 1; - info!( - "Uploading chunk {chunk_num}/{chunk_count} ({} bytes)", - chunk.len() - ); - let (address, tx_hashes) = client.put_chunk_with_payment(chunk).await?; - info!( - "Chunk {chunk_num}/{chunk_count} stored at {}", - hex::encode(address) - ); - chunk_addresses.push(address); - for tx in &tx_hashes { - all_tx_hashes.push(format!("{tx:?}")); - } - } + println!("FILE_ADDRESS={address_hex}"); + println!("MODE=public"); + println!("CHUNKS={chunk_count}"); + println!("TOTAL_SIZE={file_size}"); + println!("PAYMENTS={combined_tx}"); - // Create and upload manifest (also paid) - let total_size = - u64::try_from(file_size).map_err(|e| color_eyre::eyre::eyre!("File too large: {e}"))?; - let manifest = create_manifest(filename, total_size, chunk_addresses); - let manifest_bytes = serialize_manifest(&manifest)?; - let (manifest_address, manifest_tx_hashes) = - client.put_chunk_with_payment(manifest_bytes).await?; - for tx in &manifest_tx_hashes { - all_tx_hashes.push(format!("{tx:?}")); - } + let mut all = all_tx_hashes; + all.extend(dm_tx_hashes); + println!("TX_HASHES={}", all.join(",")); - let manifest_hex = hex::encode(manifest_address); - let total_tx_count = all_tx_hashes.len(); - let tx_hashes_str = all_tx_hashes.join(","); - - // Print results to stdout - println!("FILE_ADDRESS={manifest_hex}"); - println!("CHUNKS={chunk_count}"); - println!("TOTAL_SIZE={file_size}"); - println!("PAYMENTS={total_tx_count}"); - println!("TX_HASHES={tx_hashes_str}"); + info!("Upload complete (public): address={address_hex}, chunks={chunk_count}"); + } else { + // Private mode: save DataMap locally, never upload it + let data_map_bytes = serialize_data_map(&data_map)?; + let datamap_path = path.with_extension("datamap"); + std::fs::write(&datamap_path, &data_map_bytes)?; + + println!("DATAMAP_FILE={}", datamap_path.display()); + println!("DATAMAP_HEX={}", hex::encode(&data_map_bytes)); + println!("MODE=private"); + println!("CHUNKS={chunk_count}"); + println!("TOTAL_SIZE={file_size}"); + println!("PAYMENTS={total_tx_count}"); + println!("TX_HASHES={}", all_tx_hashes.join(",")); - info!( - "Upload complete: address={manifest_hex}, chunks={chunk_count}, payments={total_tx_count}" - ); + info!( + "Upload complete (private): datamap saved to {}, chunks={chunk_count}", + datamap_path.display() + ); + } Ok(()) } async fn handle_download( client: &QuantumClient, - address: &str, + address: Option<&str>, + datamap_path: Option<&Path>, output: Option<&Path>, ) -> color_eyre::Result<()> { - let manifest_address = parse_address(address)?; - info!("Downloading file from manifest {address}"); - - // Fetch manifest chunk - let manifest_chunk = client - .get_chunk(&manifest_address) - .await? - .ok_or_else(|| color_eyre::eyre::eyre!("Manifest chunk not found at {address}"))?; - - let manifest = deserialize_manifest(&manifest_chunk.content)?; - let chunk_count = manifest.chunk_addresses.len(); - info!( - "Manifest loaded: {} chunk(s), {} bytes total", - chunk_count, manifest.total_size + // Resolve the DataMap: either from network (public) or local file (private) + let data_map = if let Some(dm_path) = datamap_path { + info!("Loading DataMap from local file: {}", dm_path.display()); + let dm_bytes = std::fs::read(dm_path)?; + deserialize_data_map(&dm_bytes)? + } else if let Some(addr_str) = address { + let addr = parse_address(addr_str)?; + info!("Fetching DataMap from network: {addr_str}"); + fetch_data_map_public(&addr, client).await? + } else { + return Err(color_eyre::eyre::eyre!( + "Either an address or --datamap must be provided for download" + )); + }; + + let chunk_count = data_map.chunk_identifiers.len(); + info!("DataMap loaded: {chunk_count} chunk(s)"); + + // Determine output path + let output_path = output.map_or_else( + || PathBuf::from("downloaded_file"), + std::borrow::ToOwned::to_owned, ); - // Fetch all data chunks in order - let mut chunks = Vec::with_capacity(chunk_count); - for (i, chunk_addr) in manifest.chunk_addresses.iter().enumerate() { - let chunk_num = i + 1; - info!( - "Downloading chunk {chunk_num}/{chunk_count} ({})", - hex::encode(chunk_addr) - ); - let chunk = client.get_chunk(chunk_addr).await?.ok_or_else(|| { - color_eyre::eyre::eyre!("Data chunk not found: {}", hex::encode(chunk_addr)) - })?; - chunks.push(chunk.content); - } + download_and_decrypt_file(&data_map, &output_path, client).await?; - // Reassemble file - let file_content = reassemble_file(&manifest, &chunks)?; - info!("File reassembled: {} bytes", file_content.len()); - - // Write output - if let Some(path) = output { - std::fs::write(path, &file_content)?; - info!("File saved to {}", path.display()); - println!( - "Downloaded {} bytes to {}", - file_content.len(), - path.display() - ); - } else { - use std::io::Write; - std::io::stdout().write_all(&file_content)?; - } + let file_size = std::fs::metadata(&output_path)?.len(); + println!("Downloaded {file_size} bytes to {}", output_path.display()); Ok(()) } async fn handle_chunk_put(client: &QuantumClient, file: Option) -> color_eyre::Result<()> { - let content = read_input(file)?; + let content = read_input(file.as_deref())?; info!("Storing single chunk ({} bytes)", content.len()); let (address, tx_hashes) = client.put_chunk_with_payment(Bytes::from(content)).await?; @@ -247,7 +233,6 @@ async fn handle_chunk_get( std::fs::write(&path, &chunk.content)?; info!("Chunk saved to {}", path.display()); } else { - use std::io::Write; std::io::stdout().write_all(&chunk.content)?; } } @@ -261,12 +246,25 @@ async fn handle_chunk_get( Ok(()) } -fn read_input(file: Option) -> color_eyre::Result> { +fn read_input(file: Option<&Path>) -> color_eyre::Result> { if let Some(path) = file { + let meta = std::fs::metadata(path)?; + if meta.len() > MAX_CHUNK_SIZE as u64 { + return Err(color_eyre::eyre::eyre!( + "Input file exceeds MAX_CHUNK_SIZE ({MAX_CHUNK_SIZE} bytes): {} bytes", + meta.len() + )); + } return Ok(std::fs::read(path)?); } + let limit = (MAX_CHUNK_SIZE + 1) as u64; let mut buf = Vec::new(); - std::io::stdin().read_to_end(&mut buf)?; + std::io::stdin().take(limit).read_to_end(&mut buf)?; + if buf.len() > MAX_CHUNK_SIZE { + return Err(color_eyre::eyre::eyre!( + "Stdin input exceeds MAX_CHUNK_SIZE ({MAX_CHUNK_SIZE} bytes)" + )); + } Ok(buf) } diff --git a/src/client/mod.rs b/src/client/mod.rs index d161f9c2..15c80b7f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -57,6 +57,7 @@ mod chunk_protocol; mod data_types; pub mod file_ops; mod quantum; +pub mod self_encrypt; pub use chunk_protocol::send_and_await_chunk_response; pub use data_types::{ diff --git a/src/client/self_encrypt.rs b/src/client/self_encrypt.rs new file mode 100644 index 00000000..86473fb9 --- /dev/null +++ b/src/client/self_encrypt.rs @@ -0,0 +1,643 @@ +//! Self-encryption integration for streaming file encrypt/decrypt. +//! +//! Wraps the `self_encryption` crate's streaming API to provide: +//! - **Streaming encryption** from file path (low memory footprint) +//! - **Streaming decryption** to file path +//! - **`DataMap` serialization** for public/private data modes +//! +//! ## Public vs Private Data +//! +//! - **Public**: `DataMap` is stored as a chunk on the network; anyone with +//! the `DataMap` address can reconstruct the file. +//! - **Private** (default): `DataMap` is returned to the caller and never +//! uploaded. Only the holder of the `DataMap` can access the file. + +use crate::client::quantum::QuantumClient; +use crate::error::{Error, Result}; +use bytes::Bytes; +use self_encryption::DataMap; +use std::collections::HashMap; +use std::hash::BuildHasher; +use std::io::{BufReader, Read, Write}; +use std::path::Path; +use tokio::runtime::Handle; +use tracing::{debug, info}; +use xor_name::XorName; + +use crate::client::data_types::XorName as ChunkAddress; + +/// Encrypt a file using streaming self-encryption and upload each chunk. +/// +/// Uses `stream_encrypt()` which reads the file incrementally via an iterator. +/// Chunks are collected in memory, then uploaded sequentially with payment. +/// +/// Returns the `DataMap` after all chunks are uploaded, plus the list of +/// transaction hash strings from payment. +/// +/// # Errors +/// +/// Returns an error if encryption fails, or any chunk upload fails. +pub async fn encrypt_and_upload_file( + file_path: &Path, + client: &QuantumClient, +) -> Result<(DataMap, Vec)> { + let metadata = std::fs::metadata(file_path).map_err(Error::Io)?; + let file_size: usize = metadata + .len() + .try_into() + .map_err(|_| Error::Crypto("File too large for this platform".into()))?; + info!( + "Encrypting file: {} ({file_size} bytes)", + file_path.display() + ); + + let (data_map, collected) = encrypt_file_to_chunks(file_path, file_size)?; + + let chunk_count = collected.len(); + info!("Encryption complete: {chunk_count} chunk(s) produced"); + + let mut all_tx_hashes: Vec = Vec::new(); + for (i, (hash, content)) in collected.into_iter().enumerate() { + let chunk_num = i + 1; + debug!("Uploading encrypted chunk {chunk_num}/{chunk_count}"); + let (address, tx_hashes) = client.put_chunk_with_payment(content).await?; + if address != hash.0 { + return Err(Error::Crypto(format!( + "Hash mismatch for chunk {chunk_num}: self_encryption={} network={}", + hex::encode(hash.0), + hex::encode(address) + ))); + } + all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); + } + + info!("All {chunk_count} encrypted chunks uploaded"); + Ok((data_map, all_tx_hashes)) +} + +/// Encrypt a file from disk using `stream_encrypt`, returning the `DataMap` +/// and a list of `(XorName, Bytes)` encrypted chunks. +fn encrypt_file_to_chunks( + file_path: &Path, + file_size: usize, +) -> Result<(DataMap, Vec<(XorName, Bytes)>)> { + let file = std::fs::File::open(file_path).map_err(Error::Io)?; + let mut reader = BufReader::new(file); + + let data_iter = std::iter::from_fn(move || { + let mut buf = vec![0u8; 65536]; + match reader.read(&mut buf) { + Ok(0) | Err(_) => None, + Ok(n) => { + buf.truncate(n); + Some(Bytes::from(buf)) + } + } + }); + + let mut stream = self_encryption::stream_encrypt(file_size, data_iter) + .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + + let mut chunks = Vec::new(); + for chunk_result in stream.chunks() { + let (hash, content) = + chunk_result.map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + chunks.push((hash, content)); + } + + let data_map = stream + .into_datamap() + .ok_or_else(|| Error::Crypto("DataMap not available after encryption".into()))?; + + Ok((data_map, chunks)) +} + +/// Download and decrypt a file given its `DataMap`. +/// +/// Uses `streaming_decrypt()` which yields decrypted chunks as an iterator, +/// fetching encrypted chunks on-demand in batches from the network. +/// Each batch is fetched via `block_in_place` + `Handle::block_on` to safely +/// bridge async network I/O into the sync callback that the self-encryption +/// crate requires. +/// +/// Memory usage is bounded by the batch size (default ~10 chunks), not +/// the total file size. +/// +/// # Errors +/// +/// Returns an error if any chunk cannot be fetched or decryption fails. +pub async fn download_and_decrypt_file( + data_map: &DataMap, + output_path: &Path, + client: &QuantumClient, +) -> Result<()> { + let chunk_count = data_map.chunk_identifiers.len(); + info!("Decrypting file: {chunk_count} chunk(s) to decrypt (fetching on-demand)"); + + let handle = Handle::current(); + + let stream = self_encryption::streaming_decrypt(data_map, |batch: &[(usize, XorName)]| { + let mut results = Vec::with_capacity(batch.len()); + for &(idx, ref hash) in batch { + let addr = hash.0; + let addr_hex = hex::encode(addr); + debug!("Fetching chunk idx={idx} ({addr_hex})"); + let chunk = tokio::task::block_in_place(|| handle.block_on(client.get_chunk(&addr))) + .map_err(|e| { + self_encryption::Error::Generic(format!( + "Network fetch failed for {addr_hex}: {e}" + )) + })? + .ok_or_else(|| { + self_encryption::Error::Generic(format!( + "Chunk not found on network: {addr_hex}" + )) + })?; + results.push((idx, chunk.content)); + } + Ok(results) + }) + .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + + let mut file = std::fs::File::create(output_path).map_err(Error::Io)?; + for chunk_result in stream { + let chunk_bytes = + chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + file.write_all(&chunk_bytes).map_err(Error::Io)?; + } + + info!("Decryption complete: {}", output_path.display()); + Ok(()) +} + +/// Serialize a `DataMap` to bytes using bincode (via `DataMap::to_bytes`). +/// +/// # Errors +/// +/// Returns an error if serialization fails. +pub fn serialize_data_map(data_map: &DataMap) -> Result> { + data_map + .to_bytes() + .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}"))) +} + +/// Deserialize a `DataMap` from bytes. +/// +/// # Errors +/// +/// Returns an error if deserialization fails. +pub fn deserialize_data_map(bytes: &[u8]) -> Result { + DataMap::from_bytes(bytes) + .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}"))) +} + +/// Store a `DataMap` as a chunk on the network (public mode). +/// +/// Serializes the `DataMap` and uploads it as a regular content-addressed chunk. +/// Returns the address (BLAKE3 hash) of the stored `DataMap` chunk. +/// +/// # Errors +/// +/// Returns an error if serialization or upload fails. +pub async fn store_data_map_public( + data_map: &DataMap, + client: &QuantumClient, +) -> Result<(ChunkAddress, Vec)> { + let data_map_bytes = serialize_data_map(data_map)?; + let content = Bytes::from(data_map_bytes); + let (address, tx_hashes) = client.put_chunk_with_payment(content).await?; + let tx_strs: Vec = tx_hashes.iter().map(|tx| format!("{tx:?}")).collect(); + let address_hex = hex::encode(address); + info!("DataMap stored publicly at {address_hex}"); + Ok((address, tx_strs)) +} + +/// Retrieve a `DataMap` from the network (public mode). +/// +/// Fetches the `DataMap` chunk by address and deserializes it. +/// +/// # Errors +/// +/// Returns an error if the chunk is not found or deserialization fails. +pub async fn fetch_data_map_public( + address: &ChunkAddress, + client: &QuantumClient, +) -> Result { + let chunk = client.get_chunk(address).await?.ok_or_else(|| { + Error::Storage(format!( + "DataMap chunk not found at {}", + hex::encode(address) + )) + })?; + deserialize_data_map(&chunk.content) +} + +/// Encrypt a file to a local chunk store (no network). Useful for testing. +/// +/// Returns the `DataMap` and a `HashMap` of `XorName` -> `Bytes` containing +/// all encrypted chunks. +/// +/// # Errors +/// +/// Returns an error if the file cannot be read or encryption fails. +pub fn encrypt_file_local(file_path: &Path) -> Result<(DataMap, HashMap)> { + let file_size: usize = std::fs::metadata(file_path) + .map_err(Error::Io)? + .len() + .try_into() + .map_err(|_| Error::Crypto("File too large for this platform".into()))?; + let (data_map, chunk_list) = encrypt_file_to_chunks(file_path, file_size)?; + let store: HashMap = chunk_list.into_iter().collect(); + Ok((data_map, store)) +} + +/// Decrypt a file from a local chunk store (no network). Useful for testing. +/// +/// # Errors +/// +/// Returns an error if any chunk is missing or decryption fails. +pub fn decrypt_file_local( + data_map: &DataMap, + chunk_store: &HashMap, + output_path: &Path, +) -> Result<()> { + decrypt_from_store(data_map, chunk_store, output_path) +} + +/// Shared helper: decrypt a `DataMap` using a chunk store `HashMap`. +fn decrypt_from_store( + data_map: &DataMap, + chunk_store: &HashMap, + output_path: &Path, +) -> Result<()> { + let stream = self_encryption::streaming_decrypt(data_map, |batch: &[(usize, XorName)]| { + let mut results = Vec::with_capacity(batch.len()); + for &(idx, ref hash) in batch { + let content = chunk_store.get(hash).ok_or_else(|| { + self_encryption::Error::Generic(format!("Chunk not found: {hash:?}")) + })?; + results.push((idx, content.clone())); + } + Ok(results) + }) + .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + + let mut file = std::fs::File::create(output_path).map_err(Error::Io)?; + for chunk_result in stream { + let chunk_bytes = + chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + file.write_all(&chunk_bytes).map_err(Error::Io)?; + } + + Ok(()) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use crate::client::data_types::compute_address; + use std::io::Write; + + fn create_temp_file(content: &[u8]) -> tempfile::NamedTempFile { + let mut file = tempfile::NamedTempFile::new().unwrap(); + file.write_all(content).unwrap(); + file.flush().unwrap(); + file + } + + #[test] + fn test_encrypt_decrypt_roundtrip_small() { + let original = vec![0xABu8; 4096]; + let input_file = create_temp_file(&original); + + let (data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + assert!( + !data_map.chunk_identifiers.is_empty(), + "DataMap should have chunk identifiers" + ); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + decrypt_file_local(&data_map, &store, output_file.path()).unwrap(); + + let decrypted = std::fs::read(output_file.path()).unwrap(); + assert_eq!(decrypted, original); + } + + #[test] + fn test_encrypt_decrypt_roundtrip_medium() { + let original: Vec = (0u8..=255).cycle().take(1_048_576).collect(); + let input_file = create_temp_file(&original); + + let (data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + decrypt_file_local(&data_map, &store, output_file.path()).unwrap(); + + let decrypted = std::fs::read(output_file.path()).unwrap(); + assert_eq!(decrypted, original); + } + + #[test] + fn test_encrypt_produces_encrypted_output() { + let original = b"This is a known plaintext pattern for testing encryption"; + let mut data = Vec::new(); + for _ in 0..100 { + data.extend_from_slice(original); + } + let input_file = create_temp_file(&data); + + let (_data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + let plaintext_str = "This is a known plaintext pattern"; + for content in store.values() { + let chunk_str = String::from_utf8_lossy(content); + assert!( + !chunk_str.contains(plaintext_str), + "Encrypted chunk should not contain plaintext" + ); + } + } + + #[test] + fn test_data_map_serialization_roundtrip() { + let original = vec![0xCDu8; 8192]; + let input_file = create_temp_file(&original); + + let (data_map, _store) = encrypt_file_local(input_file.path()).unwrap(); + + let serialized = serialize_data_map(&data_map).unwrap(); + let deserialized = deserialize_data_map(&serialized).unwrap(); + + assert_eq!( + data_map.chunk_identifiers.len(), + deserialized.chunk_identifiers.len() + ); + assert_eq!(data_map.child, deserialized.child); + } + + #[test] + fn test_data_map_contains_correct_chunk_count() { + let original = vec![0xEFu8; 1_048_576]; + let input_file = create_temp_file(&original); + + let (data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + assert!( + data_map.chunk_identifiers.len() >= 3, + "Should have at least 3 chunk identifiers, got {}", + data_map.chunk_identifiers.len() + ); + + for info in &data_map.chunk_identifiers { + assert!( + store.contains_key(&info.dst_hash), + "Chunk store should contain chunk referenced by DataMap" + ); + } + } + + #[test] + fn test_encrypted_chunks_have_valid_addresses() { + let original = vec![0x42u8; 8192]; + let input_file = create_temp_file(&original); + + let (data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + for info in &data_map.chunk_identifiers { + let content = store.get(&info.dst_hash).expect("Chunk should exist"); + let computed = compute_address(content); + assert_eq!( + computed, info.dst_hash.0, + "BLAKE3(encrypted_content) should equal dst_hash" + ); + } + } + + #[test] + fn test_decryption_fails_without_correct_data_map() { + let original = vec![0x11u8; 8192]; + let input_file = create_temp_file(&original); + let (_data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + let other = vec![0x22u8; 8192]; + let other_file = create_temp_file(&other); + let (wrong_data_map, _) = encrypt_file_local(other_file.path()).unwrap(); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + let result = decrypt_file_local(&wrong_data_map, &store, output_file.path()); + + assert!(result.is_err(), "Decryption with wrong DataMap should fail"); + } + + #[test] + fn test_cannot_recover_data_from_chunks_alone() { + let original = vec![0x33u8; 8192]; + let input_file = create_temp_file(&original); + let (_data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + let mut concatenated = Vec::new(); + for content in store.values() { + concatenated.extend_from_slice(content); + } + + assert_ne!( + concatenated, original, + "Concatenated chunks should not match original data" + ); + } + + #[test] + fn test_chunks_do_not_contain_plaintext_patterns() { + let pattern = b"SENTINEL_PATTERN_12345"; + let mut data = Vec::with_capacity(pattern.len() * 500); + for _ in 0..500 { + data.extend_from_slice(pattern); + } + let input_file = create_temp_file(&data); + + let (_data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + + for content in store.values() { + let found = content + .windows(pattern.len()) + .any(|window| window == pattern); + assert!( + !found, + "Encrypted chunks must not contain plaintext patterns" + ); + } + } + + #[test] + fn test_missing_chunk_fails_decryption() { + let original = vec![0x44u8; 8192]; + let input_file = create_temp_file(&original); + let (data_map, mut store) = encrypt_file_local(input_file.path()).unwrap(); + + if let Some(info) = data_map.chunk_identifiers.first() { + store.remove(&info.dst_hash); + } + + let output_file = tempfile::NamedTempFile::new().unwrap(); + let result = decrypt_file_local(&data_map, &store, output_file.path()); + + assert!( + result.is_err(), + "Decryption should fail with a missing chunk" + ); + } + + #[test] + fn test_tampered_chunk_detected() { + let original = vec![0x55u8; 8192]; + let input_file = create_temp_file(&original); + let (data_map, mut store) = encrypt_file_local(input_file.path()).unwrap(); + + if let Some(info) = data_map.chunk_identifiers.first() { + if let Some(content) = store.get_mut(&info.dst_hash) { + let mut tampered = content.to_vec(); + if let Some(byte) = tampered.first_mut() { + *byte ^= 0xFF; + } + *content = Bytes::from(tampered); + } + } + + let output_file = tempfile::NamedTempFile::new().unwrap(); + let result = decrypt_file_local(&data_map, &store, output_file.path()); + + assert!( + result.is_err(), + "Decryption should fail with tampered chunk" + ); + } + + #[test] + fn test_wrong_data_map_fails_decryption() { + let original_a = vec![0x66u8; 8192]; + let file_a = create_temp_file(&original_a); + let (data_map_a, _store_a) = encrypt_file_local(file_a.path()).unwrap(); + + let original_b = vec![0x77u8; 8192]; + let file_b = create_temp_file(&original_b); + let (_data_map_b, store_b) = encrypt_file_local(file_b.path()).unwrap(); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + let result = decrypt_file_local(&data_map_a, &store_b, output_file.path()); + + assert!( + result.is_err(), + "Decryption with mismatched DataMap should fail" + ); + } + + #[test] + #[ignore = "Requires ugly_files/kad.pdf to be present"] + fn test_encrypt_decrypt_pdf() { + let pdf_path = Path::new("ugly_files/kad.pdf"); + if !pdf_path.exists() { + return; + } + let original = std::fs::read(pdf_path).unwrap(); + + let (data_map, store) = encrypt_file_local(pdf_path).unwrap(); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + decrypt_file_local(&data_map, &store, output_file.path()).unwrap(); + + let decrypted = std::fs::read(output_file.path()).unwrap(); + assert_eq!(decrypted, original); + } + + #[test] + #[ignore = "Requires ugly_files/pylon.mp4 to be present"] + fn test_encrypt_decrypt_video() { + let video_path = Path::new("ugly_files/pylon.mp4"); + if !video_path.exists() { + return; + } + let original = std::fs::read(video_path).unwrap(); + + let (data_map, store) = encrypt_file_local(video_path).unwrap(); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + decrypt_file_local(&data_map, &store, output_file.path()).unwrap(); + + let decrypted = std::fs::read(output_file.path()).unwrap(); + assert_eq!(decrypted, original); + } + + #[test] + fn test_file_below_min_encryptable_bytes_fails() { + // Files smaller than MIN_ENCRYPTABLE_BYTES (3 bytes) should fail + let tiny = create_temp_file(&[0xAA, 0xBB]); + let result = encrypt_file_local(tiny.path()); + assert!( + result.is_err(), + "Encryption of 2-byte file should fail (below MIN_ENCRYPTABLE_BYTES)" + ); + } + + #[test] + fn test_encrypt_at_minimum_size() { + // Exactly MIN_ENCRYPTABLE_BYTES = 3 bytes should succeed + let data = vec![0xAAu8; 3]; + let input_file = create_temp_file(&data); + + let (data_map, store) = encrypt_file_local(input_file.path()).unwrap(); + assert!( + !data_map.chunk_identifiers.is_empty(), + "DataMap should have chunk identifiers for 3-byte file" + ); + + let output_file = tempfile::NamedTempFile::new().unwrap(); + decrypt_file_local(&data_map, &store, output_file.path()).unwrap(); + + let decrypted = std::fs::read(output_file.path()).unwrap(); + assert_eq!(decrypted, data); + } + + #[test] + fn test_deterministic_encryption() { + // Same content should produce the same DataMap and chunks + let data = vec![0xDDu8; 8192]; + let file_a = create_temp_file(&data); + let file_b = create_temp_file(&data); + + let (data_map_a, store_a) = encrypt_file_local(file_a.path()).unwrap(); + let (data_map_b, store_b) = encrypt_file_local(file_b.path()).unwrap(); + + assert_eq!( + data_map_a.chunk_identifiers.len(), + data_map_b.chunk_identifiers.len(), + "Same content should produce same number of chunks" + ); + + for (a, b) in data_map_a + .chunk_identifiers + .iter() + .zip(data_map_b.chunk_identifiers.iter()) + { + assert_eq!(a.dst_hash, b.dst_hash, "Chunk addresses should match"); + } + + // Verify stores have the same keys + for key in store_a.keys() { + assert!( + store_b.contains_key(key), + "Both stores should contain the same chunk addresses" + ); + } + } + + #[test] + fn test_empty_file_fails() { + let empty = create_temp_file(&[]); + let result = encrypt_file_local(empty.path()); + assert!( + result.is_err(), + "Encryption of empty file should fail (below MIN_ENCRYPTABLE_BYTES)" + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index e67fed63..ff03f566 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,10 @@ pub use ant_protocol::{ ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE, }; +pub use client::self_encrypt::{ + decrypt_file_local, deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, + encrypt_file_local, fetch_data_map_public, serialize_data_map, store_data_map_public, +}; pub use client::{ compute_address, create_manifest, deserialize_manifest, peer_id_to_xor_name, reassemble_file, serialize_manifest, split_file, xor_distance, DataChunk, FileManifest, QuantumClient, diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 55b7f188..936f2064 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -31,7 +31,8 @@ use crate::ant_protocol::{ ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, }; -use crate::error::Result; +use crate::client::compute_address; +use crate::error::{Error, Result}; use crate::payment::{PaymentVerifier, QuoteGenerator}; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; @@ -93,7 +94,7 @@ impl AntProtocol { /// Returns an error if message decoding or handling fails. pub async fn handle_message(&self, data: &[u8]) -> Result { let message = ChunkMessage::decode(data) - .map_err(|e| crate::error::Error::Protocol(format!("Failed to decode message: {e}")))?; + .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?; let request_id = message.request_id; @@ -124,7 +125,7 @@ impl AntProtocol { response .encode() .map(Bytes::from) - .map_err(|e| crate::error::Error::Protocol(format!("Failed to encode response: {e}"))) + .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}"))) } /// Handle a PUT request. @@ -142,7 +143,7 @@ impl AntProtocol { } // 2. Verify content address matches BLAKE3(content) - let computed = crate::client::compute_address(&request.content); + let computed = compute_address(&request.content); if computed != address { return ChunkPutResponse::Error(ProtocolError::AddressMismatch { expected: address, From 33da91d46dcf7f11dd4e29408d7f89da1429ae2b Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 11 Mar 2026 13:39:33 +0900 Subject: [PATCH 2/6] fix: address PR review comments for self-encryption - Propagate I/O errors during file read instead of silently treating them as EOF - Remove DATAMAP_HEX from stdout in private mode (security leak) - Write decryption output to temp file, rename atomically on success - Fix misleading low memory footprint docs - Remove test-only functions from public API re-exports --- src/bin/saorsa-cli/main.rs | 1 - src/client/self_encrypt.rs | 76 ++++++++++++++++++++++++++++++-------- src/lib.rs | 4 +- 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/bin/saorsa-cli/main.rs b/src/bin/saorsa-cli/main.rs index bba69b9c..c95ca72a 100644 --- a/src/bin/saorsa-cli/main.rs +++ b/src/bin/saorsa-cli/main.rs @@ -149,7 +149,6 @@ async fn handle_upload( std::fs::write(&datamap_path, &data_map_bytes)?; println!("DATAMAP_FILE={}", datamap_path.display()); - println!("DATAMAP_HEX={}", hex::encode(&data_map_bytes)); println!("MODE=private"); println!("CHUNKS={chunk_count}"); println!("TOTAL_SIZE={file_size}"); diff --git a/src/client/self_encrypt.rs b/src/client/self_encrypt.rs index 86473fb9..0faaa474 100644 --- a/src/client/self_encrypt.rs +++ b/src/client/self_encrypt.rs @@ -1,8 +1,8 @@ -//! Self-encryption integration for streaming file encrypt/decrypt. +//! Self-encryption integration for file encrypt/decrypt. //! //! Wraps the `self_encryption` crate's streaming API to provide: -//! - **Streaming encryption** from file path (low memory footprint) -//! - **Streaming decryption** to file path +//! - **Encryption** from file path (chunks are collected in memory before upload) +//! - **Streaming decryption** to file path (bounded memory via batch fetching) //! - **`DataMap` serialization** for public/private data modes //! //! ## Public vs Private Data @@ -29,7 +29,8 @@ use crate::client::data_types::XorName as ChunkAddress; /// Encrypt a file using streaming self-encryption and upload each chunk. /// /// Uses `stream_encrypt()` which reads the file incrementally via an iterator. -/// Chunks are collected in memory, then uploaded sequentially with payment. +/// All encrypted chunks are collected in memory before uploading sequentially +/// with payment, so peak memory usage is proportional to the encrypted file size. /// /// Returns the `DataMap` after all chunks are uploaded, plus the list of /// transaction hash strings from payment. @@ -84,20 +85,37 @@ fn encrypt_file_to_chunks( let file = std::fs::File::open(file_path).map_err(Error::Io)?; let mut reader = BufReader::new(file); + let io_error: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(None)); + let io_error_writer = std::sync::Arc::clone(&io_error); + let data_iter = std::iter::from_fn(move || { let mut buf = vec![0u8; 65536]; match reader.read(&mut buf) { - Ok(0) | Err(_) => None, + Ok(0) => None, Ok(n) => { buf.truncate(n); Some(Bytes::from(buf)) } + Err(e) => { + if let Ok(mut guard) = io_error_writer.lock() { + *guard = Some(e); + } + None + } } }); let mut stream = self_encryption::stream_encrypt(file_size, data_iter) .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + // Check if an I/O error was captured during iteration + if let Ok(guard) = io_error.lock() { + if let Some(ref e) = *guard { + return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string()))); + } + } + let mut chunks = Vec::new(); for chunk_result in stream.chunks() { let (hash, content) = @@ -159,13 +177,28 @@ pub async fn download_and_decrypt_file( }) .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - let mut file = std::fs::File::create(output_path).map_err(Error::Io)?; - for chunk_result in stream { - let chunk_bytes = - chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - file.write_all(&chunk_bytes).map_err(Error::Io)?; + // Write to a temp file first, then rename atomically on success + // to prevent leaving a corrupt partial file on failure. + let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); + let tmp_path = parent.join(format!(".saorsa_decrypt_{}.tmp", std::process::id())); + + let result = (|| -> Result<()> { + let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; + for chunk_result in stream { + let chunk_bytes = + chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + file.write_all(&chunk_bytes).map_err(Error::Io)?; + } + Ok(()) + })(); + + if let Err(e) = result { + let _ = std::fs::remove_file(&tmp_path); + return Err(e); } + std::fs::rename(&tmp_path, output_path).map_err(Error::Io)?; + info!("Decryption complete: {}", output_path.display()); Ok(()) } @@ -282,13 +315,26 @@ fn decrypt_from_store( }) .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - let mut file = std::fs::File::create(output_path).map_err(Error::Io)?; - for chunk_result in stream { - let chunk_bytes = - chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - file.write_all(&chunk_bytes).map_err(Error::Io)?; + let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); + let tmp_path = parent.join(format!(".saorsa_decrypt_{}.tmp", std::process::id())); + + let result = (|| -> Result<()> { + let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; + for chunk_result in stream { + let chunk_bytes = + chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + file.write_all(&chunk_bytes).map_err(Error::Io)?; + } + Ok(()) + })(); + + if let Err(e) = result { + let _ = std::fs::remove_file(&tmp_path); + return Err(e); } + std::fs::rename(&tmp_path, output_path).map_err(Error::Io)?; + Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index ff03f566..cd07b28a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,8 +55,8 @@ pub use ant_protocol::{ ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE, }; pub use client::self_encrypt::{ - decrypt_file_local, deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, - encrypt_file_local, fetch_data_map_public, serialize_data_map, store_data_map_public, + deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, + fetch_data_map_public, serialize_data_map, store_data_map_public, }; pub use client::{ compute_address, create_manifest, deserialize_manifest, peer_id_to_xor_name, reassemble_file, From 6ccebcad75a78c23680de0617e98f95e6bc47e99 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 11 Mar 2026 17:19:47 +0900 Subject: [PATCH 3/6] fix: propagate I/O errors, unique temp files, safe runtime bridging, drain uploads on failure - Move read_error check to after chunk iteration (not just after stream_encrypt) so mid-stream I/O failures are caught instead of silently truncating data - Add randomness to temp file names to prevent collisions in concurrent calls - Reject CurrentThread runtime explicitly instead of deadlocking on block_on - Drain in-flight upload futures on failure, collecting tx_hashes from succeeded uploads and logging them before returning the error - Collect tx_hashes on hash-mismatch path too (was previously discarded) - Log temp file cleanup errors instead of silently swallowing them --- src/client/self_encrypt.rs | 272 ++++++++++++++++++++++++++++--------- 1 file changed, 208 insertions(+), 64 deletions(-) diff --git a/src/client/self_encrypt.rs b/src/client/self_encrypt.rs index 0faaa474..c89b6bdc 100644 --- a/src/client/self_encrypt.rs +++ b/src/client/self_encrypt.rs @@ -1,7 +1,7 @@ //! Self-encryption integration for file encrypt/decrypt. //! //! Wraps the `self_encryption` crate's streaming API to provide: -//! - **Encryption** from file path (chunks are collected in memory before upload) +//! - **Streaming encryption** with bounded-memory concurrent upload //! - **Streaming decryption** to file path (bounded memory via batch fetching) //! - **`DataMap` serialization** for public/private data modes //! @@ -12,25 +12,29 @@ //! - **Private** (default): `DataMap` is returned to the caller and never //! uploaded. Only the holder of the `DataMap` can access the file. +use crate::client::data_types::XorName as ChunkAddress; use crate::client::quantum::QuantumClient; use crate::error::{Error, Result}; use bytes::Bytes; +use futures::stream::{FuturesUnordered, StreamExt}; use self_encryption::DataMap; use std::collections::HashMap; use std::hash::BuildHasher; use std::io::{BufReader, Read, Write}; use std::path::Path; +use std::sync::{Arc, Mutex}; use tokio::runtime::Handle; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use xor_name::XorName; -use crate::client::data_types::XorName as ChunkAddress; +/// Maximum number of concurrent chunk uploads. +const UPLOAD_CONCURRENCY: usize = 4; -/// Encrypt a file using streaming self-encryption and upload each chunk. +/// Encrypt a file using streaming self-encryption and upload chunks concurrently. /// -/// Uses `stream_encrypt()` which reads the file incrementally via an iterator. -/// All encrypted chunks are collected in memory before uploading sequentially -/// with payment, so peak memory usage is proportional to the encrypted file size. +/// Chunks are streamed lazily from the encryption iterator and uploaded with +/// bounded parallelism (`UPLOAD_CONCURRENCY` uploads in flight at once). +/// Peak memory is bounded by the concurrency limit, not the file size. /// /// Returns the `DataMap` after all chunks are uploaded, plus the list of /// transaction hash strings from payment. @@ -38,6 +42,7 @@ use crate::client::data_types::XorName as ChunkAddress; /// # Errors /// /// Returns an error if encryption fails, or any chunk upload fails. +#[allow(clippy::too_many_lines)] pub async fn encrypt_and_upload_file( file_path: &Path, client: &QuantumClient, @@ -52,27 +57,127 @@ pub async fn encrypt_and_upload_file( file_path.display() ); - let (data_map, collected) = encrypt_file_to_chunks(file_path, file_size)?; + let file = std::fs::File::open(file_path).map_err(Error::Io)?; + let mut reader = BufReader::new(file); + let read_error: Arc>> = Arc::new(Mutex::new(None)); + let read_error_writer = Arc::clone(&read_error); + let data_iter = std::iter::from_fn(move || { + let mut buf = vec![0u8; 65536]; + match reader.read(&mut buf) { + Ok(0) => None, + Err(e) => { + if let Ok(mut guard) = read_error_writer.lock() { + *guard = Some(e); + } + None + } + Ok(n) => { + buf.truncate(n); + Some(Bytes::from(buf)) + } + } + }); - let chunk_count = collected.len(); - info!("Encryption complete: {chunk_count} chunk(s) produced"); + let mut stream = self_encryption::stream_encrypt(file_size, data_iter) + .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; let mut all_tx_hashes: Vec = Vec::new(); - for (i, (hash, content)) in collected.into_iter().enumerate() { - let chunk_num = i + 1; - debug!("Uploading encrypted chunk {chunk_num}/{chunk_count}"); - let (address, tx_hashes) = client.put_chunk_with_payment(content).await?; - if address != hash.0 { - return Err(Error::Crypto(format!( - "Hash mismatch for chunk {chunk_num}: self_encryption={} network={}", - hex::encode(hash.0), - hex::encode(address) - ))); + let mut chunk_num: usize = 0; + + { + let mut in_flight = FuturesUnordered::new(); + let mut chunks_iter = stream.chunks(); + let mut iter_exhausted = false; + + loop { + // Fill up to UPLOAD_CONCURRENCY uploads + while !iter_exhausted && in_flight.len() < UPLOAD_CONCURRENCY { + match chunks_iter.next() { + Some(chunk_result) => { + let (hash, content) = chunk_result + .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + chunk_num += 1; + let num = chunk_num; + debug!("Uploading encrypted chunk {num}"); + let fut = async move { + let result = client.put_chunk_with_payment(content).await; + (num, hash, result) + }; + in_flight.push(fut); + } + None => { + iter_exhausted = true; + } + } + } + + if in_flight.is_empty() { + break; + } + + // Await the next completed upload + let (num, hash, result) = in_flight + .next() + .await + .ok_or_else(|| Error::Crypto("Upload stream unexpectedly empty".into()))?; + match result { + Ok((address, tx_hashes)) => { + if address != hash.0 { + // Drain remaining in-flight futures before returning + let mut succeeded = all_tx_hashes.len(); + while let Some((_, _, res)) = in_flight.next().await { + if let Ok((_, txs)) = res { + all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); + succeeded += 1; + } + } + if succeeded > 0 { + warn!( + "{succeeded} chunk(s) already uploaded before hash mismatch on chunk {num}; \ + tx_hashes so far: {all_tx_hashes:?}" + ); + } + return Err(Error::Crypto(format!( + "Hash mismatch for chunk {num}: self_encryption={} network={}", + hex::encode(hash.0), + hex::encode(address) + ))); + } + all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); + } + Err(e) => { + // Drain remaining in-flight futures so we don't lose paid chunks + let mut succeeded = all_tx_hashes.len(); + while let Some((_, _, res)) = in_flight.next().await { + if let Ok((_, txs)) = res { + all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); + succeeded += 1; + } + } + if succeeded > 0 { + warn!( + "{succeeded} chunk(s) already uploaded successfully before failure on chunk {num}; \ + tx_hashes so far: {all_tx_hashes:?}" + ); + } + return Err(e); + } + } + } + } + + // Check if the data iterator encountered an I/O error during chunk iteration + if let Ok(guard) = read_error.lock() { + if let Some(ref e) = *guard { + return Err(Error::Io(std::io::Error::new(e.kind(), format!("{e}")))); } - all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); } - info!("All {chunk_count} encrypted chunks uploaded"); + let data_map = stream + .into_datamap() + .ok_or_else(|| Error::Crypto("DataMap not available after encryption".into()))?; + + info!("All {chunk_num} encrypted chunks uploaded"); Ok((data_map, all_tx_hashes)) } @@ -84,38 +189,28 @@ fn encrypt_file_to_chunks( ) -> Result<(DataMap, Vec<(XorName, Bytes)>)> { let file = std::fs::File::open(file_path).map_err(Error::Io)?; let mut reader = BufReader::new(file); - - let io_error: std::sync::Arc>> = - std::sync::Arc::new(std::sync::Mutex::new(None)); - let io_error_writer = std::sync::Arc::clone(&io_error); - + let read_error: Arc>> = Arc::new(Mutex::new(None)); + let read_error_writer = Arc::clone(&read_error); let data_iter = std::iter::from_fn(move || { let mut buf = vec![0u8; 65536]; match reader.read(&mut buf) { Ok(0) => None, - Ok(n) => { - buf.truncate(n); - Some(Bytes::from(buf)) - } Err(e) => { - if let Ok(mut guard) = io_error_writer.lock() { + if let Ok(mut guard) = read_error_writer.lock() { *guard = Some(e); } None } + Ok(n) => { + buf.truncate(n); + Some(Bytes::from(buf)) + } } }); let mut stream = self_encryption::stream_encrypt(file_size, data_iter) .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; - // Check if an I/O error was captured during iteration - if let Ok(guard) = io_error.lock() { - if let Some(ref e) = *guard { - return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string()))); - } - } - let mut chunks = Vec::new(); for chunk_result in stream.chunks() { let (hash, content) = @@ -123,6 +218,13 @@ fn encrypt_file_to_chunks( chunks.push((hash, content)); } + // Check if the data iterator encountered an I/O error during chunk iteration + if let Ok(guard) = read_error.lock() { + if let Some(ref e) = *guard { + return Err(Error::Io(std::io::Error::new(e.kind(), format!("{e}")))); + } + } + let data_map = stream .into_datamap() .ok_or_else(|| Error::Crypto("DataMap not available after encryption".into()))?; @@ -134,9 +236,13 @@ fn encrypt_file_to_chunks( /// /// Uses `streaming_decrypt()` which yields decrypted chunks as an iterator, /// fetching encrypted chunks on-demand in batches from the network. -/// Each batch is fetched via `block_in_place` + `Handle::block_on` to safely -/// bridge async network I/O into the sync callback that the self-encryption -/// crate requires. +/// +/// The sync callback required by `streaming_decrypt` bridges to async via +/// `block_in_place` + `block_on`, but fetches all chunks in each batch +/// concurrently using `FuturesUnordered`. This means each `block_on` call +/// resolves an entire batch in parallel rather than fetching one chunk at a +/// time, reducing thread pool contention to one blocking call per batch +/// instead of one per chunk. /// /// Memory usage is bounded by the batch size (default ~10 chunks), not /// the total file size. @@ -155,32 +261,56 @@ pub async fn download_and_decrypt_file( let handle = Handle::current(); let stream = self_encryption::streaming_decrypt(data_map, |batch: &[(usize, XorName)]| { - let mut results = Vec::with_capacity(batch.len()); - for &(idx, ref hash) in batch { - let addr = hash.0; - let addr_hex = hex::encode(addr); - debug!("Fetching chunk idx={idx} ({addr_hex})"); - let chunk = tokio::task::block_in_place(|| handle.block_on(client.get_chunk(&addr))) - .map_err(|e| { - self_encryption::Error::Generic(format!( - "Network fetch failed for {addr_hex}: {e}" - )) - })? - .ok_or_else(|| { - self_encryption::Error::Generic(format!( - "Chunk not found on network: {addr_hex}" - )) - })?; - results.push((idx, chunk.content)); + let batch_owned: Vec<(usize, XorName)> = batch.to_vec(); + + // block_in_place panics on current_thread runtime, and handle.block_on + // deadlocks there. Reject unsupported runtime flavors explicitly. + if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread { + return Err(self_encryption::Error::Generic( + "download_and_decrypt_file requires a multi_thread tokio runtime".into(), + )); } - Ok(results) + tokio::task::block_in_place(|| { + handle.block_on(async { + let mut futs = FuturesUnordered::new(); + for (idx, hash) in batch_owned { + let addr = hash.0; + futs.push(async move { + let result = client.get_chunk(&addr).await; + (idx, hash, result) + }); + } + + let mut results = Vec::with_capacity(futs.len()); + while let Some((idx, hash, result)) = futs.next().await { + let addr_hex = hex::encode(hash.0); + let chunk = result + .map_err(|e| { + self_encryption::Error::Generic(format!( + "Network fetch failed for {addr_hex}: {e}" + )) + })? + .ok_or_else(|| { + self_encryption::Error::Generic(format!( + "Chunk not found on network: {addr_hex}" + )) + })?; + results.push((idx, chunk.content)); + } + Ok(results) + }) + }) }) .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; // Write to a temp file first, then rename atomically on success // to prevent leaving a corrupt partial file on failure. let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); - let tmp_path = parent.join(format!(".saorsa_decrypt_{}.tmp", std::process::id())); + let unique: u64 = rand::random(); + let tmp_path = parent.join(format!( + ".saorsa_decrypt_{}_{unique}.tmp", + std::process::id() + )); let result = (|| -> Result<()> { let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; @@ -193,7 +323,12 @@ pub async fn download_and_decrypt_file( })(); if let Err(e) = result { - let _ = std::fs::remove_file(&tmp_path); + if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { + warn!( + "Failed to remove temp file {}: {cleanup_err}", + tmp_path.display() + ); + } return Err(e); } @@ -316,7 +451,11 @@ fn decrypt_from_store( .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); - let tmp_path = parent.join(format!(".saorsa_decrypt_{}.tmp", std::process::id())); + let unique: u64 = rand::random(); + let tmp_path = parent.join(format!( + ".saorsa_decrypt_{}_{unique}.tmp", + std::process::id() + )); let result = (|| -> Result<()> { let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; @@ -329,7 +468,12 @@ fn decrypt_from_store( })(); if let Err(e) = result { - let _ = std::fs::remove_file(&tmp_path); + if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { + warn!( + "Failed to remove temp file {}: {cleanup_err}", + tmp_path.display() + ); + } return Err(e); } From 5823054e7c00ccbd80bbebe4c7273aa163d2abbe Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Mar 2026 13:14:52 +0900 Subject: [PATCH 4/6] fix: address PR #23 review comments for self-encryption - Remove dead .cargo/config.toml (MAX_CHUNK_SIZE env var had no effect) - Extract open_encrypt_stream() to deduplicate file-read + encrypt setup - Extract write_stream_to_file() to deduplicate atomic-write-and-rename - Fix succeeded counter: track uploaded chunks, not tx hashes - Capture tx_hashes before hash-mismatch early return - Add READ_BUFFER_SIZE constant, reuse buffer across iterator calls - Handle Windows rename-over-existing-file edge case - Pin self_encryption dep to immutable commit rev - Add backward-compat comment on legacy file_ops re-exports --- .cargo/config.toml | 8 -- Cargo.toml | 2 +- src/client/self_encrypt.rs | 240 +++++++++++++++++-------------------- src/lib.rs | 2 + 4 files changed, 116 insertions(+), 136 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index 426d66fe..00000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,8 +0,0 @@ -# Set the plaintext chunk size for self_encryption to leave headroom -# for encryption overhead (ChaCha20-Poly1305 AEAD tag: 16 bytes, -# Brotli framing: ~50 bytes). This ensures encrypted chunks always -# fit within saorsa-node's MAX_CHUNK_SIZE (4 MiB = 4,194,304 bytes). -# -# Value: 4 MiB - 4 KiB = 4,190,208 bytes (4 KiB headroom). -[env] -MAX_CHUNK_SIZE = "4190208" diff --git a/Cargo.toml b/Cargo.toml index 7d259a02..ee7e4f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ aes-gcm-siv = "0.11" hkdf = "0.12" # Self-encryption (convergent encryption + streaming) -self_encryption = { git = "https://github.com/grumbach/self_encryption.git", branch = "post_quatum" } +self_encryption = { git = "https://github.com/grumbach/self_encryption.git", rev = "95ea1a716834" } # Hashing (aligned with saorsa-core) blake3 = "1" diff --git a/src/client/self_encrypt.rs b/src/client/self_encrypt.rs index c89b6bdc..f028ea98 100644 --- a/src/client/self_encrypt.rs +++ b/src/client/self_encrypt.rs @@ -30,6 +30,103 @@ use xor_name::XorName; /// Maximum number of concurrent chunk uploads. const UPLOAD_CONCURRENCY: usize = 4; +/// Size of the read buffer used when streaming file data into the encryptor. +const READ_BUFFER_SIZE: usize = 64 * 1024; + +/// Shared error capture used by `open_encrypt_stream`. +type ReadErrorCapture = Arc>>; + +/// Open a file and produce a streaming encryption iterator. +/// +/// Returns the encrypted-chunk stream and an error capture that should be +/// checked after iteration completes. +#[allow(clippy::type_complexity)] +fn open_encrypt_stream( + file_path: &Path, + file_size: usize, +) -> Result<( + self_encryption::EncryptionStream>, + ReadErrorCapture, +)> { + let file = std::fs::File::open(file_path).map_err(Error::Io)?; + let mut reader = BufReader::new(file); + let read_error: Arc>> = Arc::new(Mutex::new(None)); + let read_error_writer = Arc::clone(&read_error); + let mut buf = vec![0u8; READ_BUFFER_SIZE]; + let data_iter = std::iter::from_fn(move || match reader.read(&mut buf) { + Ok(0) => None, + Err(e) => { + if let Ok(mut guard) = read_error_writer.lock() { + *guard = Some(e); + } + None + } + Ok(n) => Some(Bytes::copy_from_slice(&buf[..n])), + }); + + // NOTE: Chunk size headroom for encryption overhead is managed by the + // self_encryption crate itself. See self_encryption::MAX_CHUNK_SIZE. + let stream = self_encryption::stream_encrypt(file_size, data_iter) + .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + + Ok((stream, read_error)) +} + +/// Check whether the read-error capture from `open_encrypt_stream` recorded +/// an I/O error during iteration. +fn check_read_error(read_error: &ReadErrorCapture) -> Result<()> { + if let Ok(guard) = read_error.lock() { + if let Some(ref e) = *guard { + return Err(Error::Io(std::io::Error::new(e.kind(), format!("{e}")))); + } + } + Ok(()) +} + +/// Write a stream of decrypted chunks to a file atomically. +/// +/// Writes to a temporary file first, then renames on success. +/// Cleans up the temp file on error. +fn write_stream_to_file( + stream: impl Iterator>, + output_path: &Path, +) -> Result<()> { + let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); + let unique: u64 = rand::random(); + let tmp_path = parent.join(format!( + ".saorsa_decrypt_{}_{unique}.tmp", + std::process::id() + )); + + let result = (|| -> Result<()> { + let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; + for chunk_result in stream { + let chunk_bytes = + chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; + file.write_all(&chunk_bytes).map_err(Error::Io)?; + } + Ok(()) + })(); + + if let Err(e) = result { + if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { + warn!( + "Failed to remove temp file {}: {cleanup_err}", + tmp_path.display() + ); + } + return Err(e); + } + + // On Windows, rename fails if destination exists. Remove it first. + if output_path.exists() { + std::fs::remove_file(output_path).map_err(Error::Io)?; + } + std::fs::rename(&tmp_path, output_path).map_err(Error::Io)?; + + Ok(()) +} + /// Encrypt a file using streaming self-encryption and upload chunks concurrently. /// /// Chunks are streamed lazily from the encryption iterator and uploaded with @@ -57,32 +154,11 @@ pub async fn encrypt_and_upload_file( file_path.display() ); - let file = std::fs::File::open(file_path).map_err(Error::Io)?; - let mut reader = BufReader::new(file); - let read_error: Arc>> = Arc::new(Mutex::new(None)); - let read_error_writer = Arc::clone(&read_error); - let data_iter = std::iter::from_fn(move || { - let mut buf = vec![0u8; 65536]; - match reader.read(&mut buf) { - Ok(0) => None, - Err(e) => { - if let Ok(mut guard) = read_error_writer.lock() { - *guard = Some(e); - } - None - } - Ok(n) => { - buf.truncate(n); - Some(Bytes::from(buf)) - } - } - }); - - let mut stream = self_encryption::stream_encrypt(file_size, data_iter) - .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + let (mut stream, read_error) = open_encrypt_stream(file_path, file_size)?; let mut all_tx_hashes: Vec = Vec::new(); let mut chunk_num: usize = 0; + let mut uploaded_chunks: usize = 0; { let mut in_flight = FuturesUnordered::new(); @@ -122,18 +198,20 @@ pub async fn encrypt_and_upload_file( .ok_or_else(|| Error::Crypto("Upload stream unexpectedly empty".into()))?; match result { Ok((address, tx_hashes)) => { + // Always capture tx hashes, even on mismatch + all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); + uploaded_chunks += 1; if address != hash.0 { // Drain remaining in-flight futures before returning - let mut succeeded = all_tx_hashes.len(); while let Some((_, _, res)) = in_flight.next().await { if let Ok((_, txs)) = res { all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); - succeeded += 1; + uploaded_chunks += 1; } } - if succeeded > 0 { + if uploaded_chunks > 0 { warn!( - "{succeeded} chunk(s) already uploaded before hash mismatch on chunk {num}; \ + "{uploaded_chunks} chunk(s) already uploaded before hash mismatch on chunk {num}; \ tx_hashes so far: {all_tx_hashes:?}" ); } @@ -143,20 +221,18 @@ pub async fn encrypt_and_upload_file( hex::encode(address) ))); } - all_tx_hashes.extend(tx_hashes.iter().map(|tx| format!("{tx:?}"))); } Err(e) => { // Drain remaining in-flight futures so we don't lose paid chunks - let mut succeeded = all_tx_hashes.len(); while let Some((_, _, res)) = in_flight.next().await { if let Ok((_, txs)) = res { all_tx_hashes.extend(txs.iter().map(|tx| format!("{tx:?}"))); - succeeded += 1; + uploaded_chunks += 1; } } - if succeeded > 0 { + if uploaded_chunks > 0 { warn!( - "{succeeded} chunk(s) already uploaded successfully before failure on chunk {num}; \ + "{uploaded_chunks} chunk(s) already uploaded successfully before failure on chunk {num}; \ tx_hashes so far: {all_tx_hashes:?}" ); } @@ -166,12 +242,7 @@ pub async fn encrypt_and_upload_file( } } - // Check if the data iterator encountered an I/O error during chunk iteration - if let Ok(guard) = read_error.lock() { - if let Some(ref e) = *guard { - return Err(Error::Io(std::io::Error::new(e.kind(), format!("{e}")))); - } - } + check_read_error(&read_error)?; let data_map = stream .into_datamap() @@ -187,29 +258,7 @@ fn encrypt_file_to_chunks( file_path: &Path, file_size: usize, ) -> Result<(DataMap, Vec<(XorName, Bytes)>)> { - let file = std::fs::File::open(file_path).map_err(Error::Io)?; - let mut reader = BufReader::new(file); - let read_error: Arc>> = Arc::new(Mutex::new(None)); - let read_error_writer = Arc::clone(&read_error); - let data_iter = std::iter::from_fn(move || { - let mut buf = vec![0u8; 65536]; - match reader.read(&mut buf) { - Ok(0) => None, - Err(e) => { - if let Ok(mut guard) = read_error_writer.lock() { - *guard = Some(e); - } - None - } - Ok(n) => { - buf.truncate(n); - Some(Bytes::from(buf)) - } - } - }); - - let mut stream = self_encryption::stream_encrypt(file_size, data_iter) - .map_err(|e| Error::Crypto(format!("Self-encryption failed: {e}")))?; + let (mut stream, read_error) = open_encrypt_stream(file_path, file_size)?; let mut chunks = Vec::new(); for chunk_result in stream.chunks() { @@ -218,12 +267,7 @@ fn encrypt_file_to_chunks( chunks.push((hash, content)); } - // Check if the data iterator encountered an I/O error during chunk iteration - if let Ok(guard) = read_error.lock() { - if let Some(ref e) = *guard { - return Err(Error::Io(std::io::Error::new(e.kind(), format!("{e}")))); - } - } + check_read_error(&read_error)?; let data_map = stream .into_datamap() @@ -303,36 +347,7 @@ pub async fn download_and_decrypt_file( }) .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - // Write to a temp file first, then rename atomically on success - // to prevent leaving a corrupt partial file on failure. - let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); - let unique: u64 = rand::random(); - let tmp_path = parent.join(format!( - ".saorsa_decrypt_{}_{unique}.tmp", - std::process::id() - )); - - let result = (|| -> Result<()> { - let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; - for chunk_result in stream { - let chunk_bytes = - chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - file.write_all(&chunk_bytes).map_err(Error::Io)?; - } - Ok(()) - })(); - - if let Err(e) = result { - if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { - warn!( - "Failed to remove temp file {}: {cleanup_err}", - tmp_path.display() - ); - } - return Err(e); - } - - std::fs::rename(&tmp_path, output_path).map_err(Error::Io)?; + write_stream_to_file(stream, output_path)?; info!("Decryption complete: {}", output_path.display()); Ok(()) @@ -450,36 +465,7 @@ fn decrypt_from_store( }) .map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - let parent = output_path.parent().unwrap_or_else(|| Path::new(".")); - let unique: u64 = rand::random(); - let tmp_path = parent.join(format!( - ".saorsa_decrypt_{}_{unique}.tmp", - std::process::id() - )); - - let result = (|| -> Result<()> { - let mut file = std::fs::File::create(&tmp_path).map_err(Error::Io)?; - for chunk_result in stream { - let chunk_bytes = - chunk_result.map_err(|e| Error::Crypto(format!("Decryption failed: {e}")))?; - file.write_all(&chunk_bytes).map_err(Error::Io)?; - } - Ok(()) - })(); - - if let Err(e) = result { - if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) { - warn!( - "Failed to remove temp file {}: {cleanup_err}", - tmp_path.display() - ); - } - return Err(e); - } - - std::fs::rename(&tmp_path, output_path).map_err(Error::Io)?; - - Ok(()) + write_stream_to_file(stream, output_path) } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index cd07b28a..b08e49f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,8 @@ pub use client::self_encrypt::{ deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, fetch_data_map_public, serialize_data_map, store_data_map_public, }; +// Legacy plaintext chunking API — retained for backward compatibility with +// data already stored on the network without self-encryption. pub use client::{ compute_address, create_manifest, deserialize_manifest, peer_id_to_xor_name, reassemble_file, serialize_manifest, split_file, xor_distance, DataChunk, FileManifest, QuantumClient, From a032af4d2dc26854da5b78216ed82e7fc3bc4232 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Mar 2026 15:22:28 +0900 Subject: [PATCH 5/6] fix: revert self_encryption pin to branch, add clarifying comment --- Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ee7e4f4d..ce14c307 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,8 @@ aes-gcm-siv = "0.11" hkdf = "0.12" # Self-encryption (convergent encryption + streaming) -self_encryption = { git = "https://github.com/grumbach/self_encryption.git", rev = "95ea1a716834" } +# Branch name "post_quatum" is intentional (matches the upstream branch). +self_encryption = { git = "https://github.com/grumbach/self_encryption.git", branch = "post_quatum" } # Hashing (aligned with saorsa-core) blake3 = "1" From e916936350969f188300ef6e7d4ead77e91a9ca7 Mon Sep 17 00:00:00 2001 From: grumbach Date: Thu, 12 Mar 2026 17:43:46 +0900 Subject: [PATCH 6/6] refactor: remove legacy plaintext chunking API (file_ops) Self-encryption fully replaces the legacy FileManifest/split_file API. Remove file_ops module, its exports from mod.rs and lib.rs, and update CLI help text to no longer reference manifest addresses. --- src/bin/saorsa-cli/cli.rs | 2 +- src/client/file_ops.rs | 189 -------------------------------------- src/client/mod.rs | 5 - src/lib.rs | 7 +- 4 files changed, 3 insertions(+), 200 deletions(-) delete mode 100644 src/client/file_ops.rs diff --git a/src/bin/saorsa-cli/cli.rs b/src/bin/saorsa-cli/cli.rs index 02ba4b55..627ac628 100644 --- a/src/bin/saorsa-cli/cli.rs +++ b/src/bin/saorsa-cli/cli.rs @@ -81,7 +81,7 @@ pub enum FileAction { }, /// Download a file from the network. Download { - /// Hex-encoded address (public data map address or manifest address). + /// Hex-encoded address (public data map address). address: Option, /// Path to a local data map file (for private downloads). #[arg(long)] diff --git a/src/client/file_ops.rs b/src/client/file_ops.rs deleted file mode 100644 index 6e4dc2a1..00000000 --- a/src/client/file_ops.rs +++ /dev/null @@ -1,189 +0,0 @@ -//! File chunking and reassembly operations. -//! -//! Files are split into chunks of up to `MAX_CHUNK_SIZE` (4 MB). A manifest -//! chunk stores the ordered list of chunk addresses and the original file -//! metadata so the file can be reconstructed from the network. - -use super::data_types::compute_address; -use crate::ant_protocol::MAX_CHUNK_SIZE; -use crate::error::{Error, Result}; -use bytes::Bytes; -use serde::{Deserialize, Serialize}; - -/// A file manifest that describes how to reassemble a file from its chunks. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct FileManifest { - /// Original file name (if known). - pub filename: Option, - /// Total file size in bytes. - pub total_size: u64, - /// Ordered list of chunk addresses (SHA256 hashes). - pub chunk_addresses: Vec<[u8; 32]>, -} - -/// Split file content into chunks of at most `MAX_CHUNK_SIZE`. -/// -/// Returns a list of `Bytes` chunks in order. -#[must_use] -pub fn split_file(content: &[u8]) -> Vec { - if content.is_empty() { - return vec![Bytes::from_static(b"")]; - } - - content - .chunks(MAX_CHUNK_SIZE) - .map(Bytes::copy_from_slice) - .collect() -} - -/// Create a `FileManifest` from the file content and chunk addresses. -#[must_use] -pub fn create_manifest( - filename: Option, - total_size: u64, - chunk_addresses: Vec<[u8; 32]>, -) -> FileManifest { - FileManifest { - filename, - total_size, - chunk_addresses, - } -} - -/// Serialize a manifest to bytes suitable for storing as a chunk. -/// -/// # Errors -/// -/// Returns an error if serialization fails. -pub fn serialize_manifest(manifest: &FileManifest) -> Result { - let bytes = rmp_serde::to_vec(manifest) - .map_err(|e| Error::Serialization(format!("Failed to serialize manifest: {e}")))?; - Ok(Bytes::from(bytes)) -} - -/// Deserialize a manifest from bytes. -/// -/// # Errors -/// -/// Returns an error if deserialization fails. -pub fn deserialize_manifest(bytes: &[u8]) -> Result { - rmp_serde::from_slice(bytes) - .map_err(|e| Error::Serialization(format!("Failed to deserialize manifest: {e}"))) -} - -/// Reassemble file content from ordered chunks. -/// -/// Validates that total reassembled size matches the manifest. -/// -/// # Errors -/// -/// Returns an error if the reassembled size doesn't match the manifest. -pub fn reassemble_file(manifest: &FileManifest, chunks: &[Bytes]) -> Result { - let total: usize = chunks.iter().map(Bytes::len).sum(); - let expected = usize::try_from(manifest.total_size) - .map_err(|e| Error::InvalidChunk(format!("File size too large for platform: {e}")))?; - - if total != expected { - return Err(Error::InvalidChunk(format!( - "Reassembled size {total} does not match manifest size {expected}" - ))); - } - - let mut result = Vec::with_capacity(total); - for chunk in chunks { - result.extend_from_slice(chunk); - } - Ok(Bytes::from(result)) -} - -/// Compute the address for file content (for verification). -#[must_use] -pub fn compute_chunk_address(content: &[u8]) -> [u8; 32] { - compute_address(content) -} - -#[cfg(test)] -#[allow(clippy::unwrap_used, clippy::expect_used)] -mod tests { - use super::*; - - #[test] - fn test_split_empty_file() { - let chunks = split_file(b""); - assert_eq!(chunks.len(), 1); - assert!(chunks.first().unwrap().is_empty()); - } - - #[test] - fn test_split_small_file() { - let data = b"hello world"; - let chunks = split_file(data); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks.first().unwrap().as_ref(), data); - } - - #[test] - fn test_split_exact_chunk_size() { - let data = vec![0xABu8; MAX_CHUNK_SIZE]; - let chunks = split_file(&data); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks.first().unwrap().len(), MAX_CHUNK_SIZE); - } - - #[test] - fn test_split_multiple_chunks() { - let data = vec![0xCDu8; MAX_CHUNK_SIZE * 2 + 100]; - let chunks = split_file(&data); - assert_eq!(chunks.len(), 3); - assert_eq!(chunks.first().unwrap().len(), MAX_CHUNK_SIZE); - assert_eq!(chunks.get(1).unwrap().len(), MAX_CHUNK_SIZE); - assert_eq!(chunks.get(2).unwrap().len(), 100); - } - - #[test] - fn test_manifest_roundtrip() { - let manifest = create_manifest( - Some("test.txt".to_string()), - 1024, - vec![[1u8; 32], [2u8; 32]], - ); - - let bytes = serialize_manifest(&manifest).unwrap(); - let deserialized = deserialize_manifest(&bytes).unwrap(); - - assert_eq!(deserialized.filename.as_deref(), Some("test.txt")); - assert_eq!(deserialized.total_size, 1024); - assert_eq!(deserialized.chunk_addresses.len(), 2); - } - - #[test] - fn test_reassemble_file() { - let original = b"hello world, this is a test file for reassembly"; - let chunks = split_file(original); - let addresses: Vec<[u8; 32]> = chunks.iter().map(|c| compute_chunk_address(c)).collect(); - - let manifest = create_manifest(None, original.len() as u64, addresses); - let reassembled = reassemble_file(&manifest, &chunks).unwrap(); - assert_eq!(reassembled.as_ref(), original); - } - - #[test] - fn test_reassemble_size_mismatch() { - let manifest = create_manifest(None, 9999, vec![[1u8; 32]]); - let chunks = vec![Bytes::from_static(b"small")]; - let result = reassemble_file(&manifest, &chunks); - assert!(result.is_err()); - } - - #[test] - fn test_split_and_reassemble_large() { - let data = vec![0xFFu8; MAX_CHUNK_SIZE * 3 + 500]; - let chunks = split_file(&data); - assert_eq!(chunks.len(), 4); - - let addresses: Vec<[u8; 32]> = chunks.iter().map(|c| compute_chunk_address(c)).collect(); - let manifest = create_manifest(None, data.len() as u64, addresses); - let reassembled = reassemble_file(&manifest, &chunks).unwrap(); - assert_eq!(reassembled.as_ref(), data.as_slice()); - } -} diff --git a/src/client/mod.rs b/src/client/mod.rs index 15c80b7f..9a261d59 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -55,7 +55,6 @@ mod chunk_protocol; mod data_types; -pub mod file_ops; mod quantum; pub mod self_encrypt; @@ -63,8 +62,4 @@ pub use chunk_protocol::send_and_await_chunk_response; pub use data_types::{ compute_address, peer_id_to_xor_name, xor_distance, ChunkStats, DataChunk, XorName, }; -pub use file_ops::{ - create_manifest, deserialize_manifest, reassemble_file, serialize_manifest, split_file, - FileManifest, -}; pub use quantum::{hex_node_id_to_encoded_peer_id, QuantumClient, QuantumConfig}; diff --git a/src/lib.rs b/src/lib.rs index b08e49f9..15bb5a70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,12 +58,9 @@ pub use client::self_encrypt::{ deserialize_data_map, download_and_decrypt_file, encrypt_and_upload_file, fetch_data_map_public, serialize_data_map, store_data_map_public, }; -// Legacy plaintext chunking API — retained for backward compatibility with -// data already stored on the network without self-encryption. pub use client::{ - compute_address, create_manifest, deserialize_manifest, peer_id_to_xor_name, reassemble_file, - serialize_manifest, split_file, xor_distance, DataChunk, FileManifest, QuantumClient, - QuantumConfig, XorName, + compute_address, peer_id_to_xor_name, xor_distance, DataChunk, QuantumClient, QuantumConfig, + XorName, }; pub use config::{BootstrapCacheConfig, NodeConfig, StorageConfig}; pub use devnet::{Devnet, DevnetConfig, DevnetEvmInfo, DevnetManifest};