diff --git a/Cargo.toml b/Cargo.toml index 44b27b91..ad24da15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "saorsa-node" -version = "0.3.1" +version = "0.3.2" edition = "2021" authors = ["David Irvine "] description = "Pure quantum-proof network node for the Saorsa decentralized network" @@ -79,7 +79,7 @@ flate2 = "1" tar = "0.4" # Protocol serialization -bincode = "1" +postcard = { version = "1.1.3", features = ["use-std"] } [dev-dependencies] tokio-test = "0.4" diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index b8abf42c..c4e2a088 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -4,9 +4,8 @@ //! is the SHA256 hash of the content. Maximum size is 4MB. //! //! This module defines the wire protocol messages for chunk operations -//! using bincode serialization for compact, fast encoding. +//! using postcard serialization for compact, fast encoding. -use bincode::Options; use serde::{Deserialize, Serialize}; /// Protocol identifier for chunk operations. @@ -18,13 +17,12 @@ pub const PROTOCOL_VERSION: u16 = 1; /// Maximum chunk size in bytes (4MB). pub const MAX_CHUNK_SIZE: usize = 4 * 1024 * 1024; -/// Maximum wire message size for deserialization. +/// Maximum wire message size in bytes (5MB). /// -/// Set to `MAX_CHUNK_SIZE` + 1 MB headroom for the envelope (`request_id`, -/// enum discriminants, address, payment proof, etc.). This prevents a -/// malicious peer from sending a length-prefixed `Vec` that causes an -/// unbounded allocation. -const MAX_WIRE_MESSAGE_SIZE: u64 = (MAX_CHUNK_SIZE + 1024 * 1024) as u64; +/// Limits the input buffer accepted by [`ChunkMessage::decode`] to prevent +/// unbounded allocation from malicious or corrupted payloads. Set slightly +/// above [`MAX_CHUNK_SIZE`] to accommodate message envelope overhead. +pub const MAX_WIRE_MESSAGE_SIZE: usize = 5 * 1024 * 1024; /// Data type identifier for chunks. pub const DATA_TYPE_CHUNK: u32 = 0; @@ -66,30 +64,33 @@ pub struct ChunkMessage { } impl ChunkMessage { - /// Encode the message to bytes using bincode. + /// Encode the message to bytes using postcard. /// /// # Errors /// /// Returns an error if serialization fails. pub fn encode(&self) -> Result, ProtocolError> { - bincode::DefaultOptions::new() - .with_limit(MAX_WIRE_MESSAGE_SIZE) - .allow_trailing_bytes() - .serialize(self) - .map_err(|e| ProtocolError::SerializationFailed(e.to_string())) + postcard::to_stdvec(self).map_err(|e| ProtocolError::SerializationFailed(e.to_string())) } - /// Decode a message from bytes using bincode. + /// Decode a message from bytes using postcard. + /// + /// Rejects payloads larger than [`MAX_WIRE_MESSAGE_SIZE`] before + /// attempting deserialization. /// /// # Errors /// - /// Returns an error if deserialization fails. + /// Returns [`ProtocolError::MessageTooLarge`] if the input exceeds the + /// size limit, or [`ProtocolError::DeserializationFailed`] if postcard + /// cannot parse the data. pub fn decode(data: &[u8]) -> Result { - bincode::DefaultOptions::new() - .with_limit(MAX_WIRE_MESSAGE_SIZE) - .allow_trailing_bytes() - .deserialize(data) - .map_err(|e| ProtocolError::DeserializationFailed(e.to_string())) + if data.len() > MAX_WIRE_MESSAGE_SIZE { + return Err(ProtocolError::MessageTooLarge { + size: data.len(), + max_size: MAX_WIRE_MESSAGE_SIZE, + }); + } + postcard::from_bytes(data).map_err(|e| ProtocolError::DeserializationFailed(e.to_string())) } } @@ -241,6 +242,13 @@ pub enum ProtocolError { SerializationFailed(String), /// Message deserialization failed. DeserializationFailed(String), + /// Wire message exceeds the maximum allowed size. + MessageTooLarge { + /// Actual size of the message in bytes. + size: usize, + /// Maximum allowed size. + max_size: usize, + }, /// Chunk exceeds maximum size. ChunkTooLarge { /// Size of the chunk in bytes. @@ -270,6 +278,9 @@ impl std::fmt::Display for ProtocolError { match self { Self::SerializationFailed(msg) => write!(f, "serialization failed: {msg}"), Self::DeserializationFailed(msg) => write!(f, "deserialization failed: {msg}"), + Self::MessageTooLarge { size, max_size } => { + write!(f, "message size {size} exceeds maximum {max_size}") + } Self::ChunkTooLarge { size, max_size } => { write!(f, "chunk size {size} exceeds maximum {max_size}") } @@ -434,6 +445,18 @@ mod tests { assert!(display.contains("address mismatch")); } + #[test] + fn test_decode_rejects_oversized_payload() { + let oversized = vec![0u8; MAX_WIRE_MESSAGE_SIZE + 1]; + let result = ChunkMessage::decode(&oversized); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + matches!(err, ProtocolError::MessageTooLarge { .. }), + "expected MessageTooLarge, got {err:?}" + ); + } + #[test] fn test_invalid_decode() { let invalid_data = vec![0xFF, 0xFF, 0xFF]; diff --git a/src/ant_protocol/mod.rs b/src/ant_protocol/mod.rs index 5f86ec4c..0d9b7457 100644 --- a/src/ant_protocol/mod.rs +++ b/src/ant_protocol/mod.rs @@ -14,7 +14,7 @@ //! //! # Protocol Overview //! -//! The protocol uses bincode serialization for compact, fast encoding. +//! The protocol uses postcard serialization for compact, fast encoding. //! Each data type has its own message types for PUT/GET operations. //! //! ## Chunk Messages diff --git a/src/client/chunk_protocol.rs b/src/client/chunk_protocol.rs index f97d2399..988bf27e 100644 --- a/src/client/chunk_protocol.rs +++ b/src/client/chunk_protocol.rs @@ -6,8 +6,9 @@ use crate::ant_protocol::{ChunkMessage, ChunkMessageBody, CHUNK_PROTOCOL_ID}; use saorsa_core::{P2PEvent, P2PNode}; use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; use tokio::time::Instant; -use tracing::warn; +use tracing::{debug, warn}; /// Send a chunk-protocol message to `target_peer` and await a matching response. /// @@ -70,7 +71,10 @@ pub async fn send_and_await_chunk_response( } } Ok(Ok(_)) => {} - Ok(Err(_)) | Err(_) => break, + Ok(Err(RecvError::Lagged(skipped))) => { + debug!("Chunk protocol events lagged by {skipped} messages, continuing"); + } + Ok(Err(RecvError::Closed)) | Err(_) => break, } } diff --git a/src/client/quantum.rs b/src/client/quantum.rs index 3b462381..8537a5e2 100644 --- a/src/client/quantum.rs +++ b/src/client/quantum.rs @@ -28,7 +28,7 @@ use saorsa_core::P2PNode; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; /// Default timeout for network operations in seconds. const DEFAULT_TIMEOUT_SECS: u64 = 30; @@ -154,12 +154,39 @@ impl QuantumClient { address: addr, content, }) => { - debug!( - "Found chunk {} on saorsa network ({} bytes)", - hex::encode(addr), - content.len() - ); - Some(Ok(Some(DataChunk::new(addr, Bytes::from(content))))) + if addr == *address { + let computed = crate::client::compute_address(&content); + if computed == addr { + debug!( + "Found chunk {} on saorsa network ({} bytes)", + hex::encode(addr), + content.len() + ); + Some(Ok(Some(DataChunk::new(addr, Bytes::from(content))))) + } else { + warn!( + "Peer returned chunk {} with invalid content hash {}", + addr_hex, + hex::encode(computed) + ); + Some(Err(Error::InvalidChunk(format!( + "Invalid chunk content: expected hash {}, got {}", + addr_hex, + hex::encode(computed) + )))) + } + } else { + warn!( + "Peer returned chunk {} but we requested {}", + hex::encode(addr), + addr_hex + ); + Some(Err(Error::InvalidChunk(format!( + "Mismatched chunk address: expected {}, got {}", + addr_hex, + hex::encode(addr) + )))) + } } ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => { debug!("Chunk {} not found on saorsa network", addr_hex); diff --git a/src/storage/disk.rs b/src/storage/disk.rs index a71e4513..e48b864f 100644 --- a/src/storage/disk.rs +++ b/src/storage/disk.rs @@ -12,8 +12,10 @@ use crate::ant_protocol::XorName; use crate::error::{Error, Result}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs; use tokio::io::AsyncWriteExt; +use tokio::task::spawn_blocking; use tracing::{debug, trace, warn}; /// Configuration for disk storage. @@ -52,6 +54,8 @@ pub struct StorageStats { pub duplicates: u64, /// Number of verification failures on read. pub verification_failures: u64, + /// Number of chunks currently persisted on disk. + pub current_chunks: u64, } /// Content-addressed disk storage. @@ -65,6 +69,8 @@ pub struct DiskStorage { config: DiskStorageConfig, /// Operation statistics. stats: parking_lot::RwLock, + /// Current number of chunks on disk. + current_chunks: AtomicU64, } impl DiskStorage { @@ -82,9 +88,16 @@ impl DiskStorage { debug!("Initialized disk storage at {:?}", config.root_dir); + let scan_dir = chunks_dir.clone(); + let existing_chunks = spawn_blocking(move || Self::count_existing_chunks(&scan_dir)) + .await + .map_err(|e| Error::Storage(format!("Failed to count chunks: {e}")))? + .map_err(|e| Error::Storage(format!("Failed to count chunks: {e}")))?; + Ok(Self { config, stats: parking_lot::RwLock::new(StorageStats::default()), + current_chunks: AtomicU64::new(existing_chunks), }) } @@ -128,41 +141,74 @@ impl DiskStorage { } // Enforce max_chunks capacity limit (0 = unlimited) + let mut reserved_slot = false; + let mut increment_after_commit = false; if self.config.max_chunks > 0 { - let chunks_stored = self.stats.read().chunks_stored; - if chunks_stored >= self.config.max_chunks as u64 { - return Err(Error::Storage(format!( - "Storage capacity reached: {} chunks stored, max is {}", - chunks_stored, self.config.max_chunks - ))); + let max_chunks = self.config.max_chunks as u64; + match self + .current_chunks + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + if current >= max_chunks { + None + } else { + Some(current + 1) + } + }) { + Ok(_) => { + reserved_slot = true; + } + Err(current) => { + return Err(Error::Storage(format!( + "Storage capacity reached: {} chunks stored, max is {}", + current, self.config.max_chunks + ))); + } } + } else { + increment_after_commit = true; } + let mut release_slot = || { + if reserved_slot { + self.current_chunks.fetch_sub(1, Ordering::SeqCst); + reserved_slot = false; + } + }; + // Ensure parent directories exist if let Some(parent) = chunk_path.parent() { - fs::create_dir_all(parent) - .await - .map_err(|e| Error::Storage(format!("Failed to create shard directory: {e}")))?; + fs::create_dir_all(parent).await.map_err(|e| { + release_slot(); + Error::Storage(format!("Failed to create shard directory: {e}")) + })?; } // Atomic write: temp file + rename let temp_path = chunk_path.with_extension("tmp"); - let mut file = fs::File::create(&temp_path) - .await - .map_err(|e| Error::Storage(format!("Failed to create temp file: {e}")))?; + let mut file = fs::File::create(&temp_path).await.map_err(|e| { + release_slot(); + Error::Storage(format!("Failed to create temp file: {e}")) + })?; - file.write_all(content) - .await - .map_err(|e| Error::Storage(format!("Failed to write chunk: {e}")))?; + file.write_all(content).await.map_err(|e| { + release_slot(); + Error::Storage(format!("Failed to write chunk: {e}")) + })?; - file.flush() - .await - .map_err(|e| Error::Storage(format!("Failed to flush chunk: {e}")))?; + file.flush().await.map_err(|e| { + release_slot(); + Error::Storage(format!("Failed to flush chunk: {e}")) + })?; // Rename for atomic commit - fs::rename(&temp_path, &chunk_path) - .await - .map_err(|e| Error::Storage(format!("Failed to rename temp file: {e}")))?; + fs::rename(&temp_path, &chunk_path).await.map_err(|e| { + release_slot(); + Error::Storage(format!("Failed to rename temp file: {e}")) + })?; + + if increment_after_commit { + self.current_chunks.fetch_add(1, Ordering::SeqCst); + } { let mut stats = self.stats.write(); @@ -261,6 +307,8 @@ impl DiskStorage { .await .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?; + self.current_chunks.fetch_sub(1, Ordering::SeqCst); + debug!("Deleted chunk {}", hex::encode(address)); Ok(true) @@ -269,7 +317,9 @@ impl DiskStorage { /// Get storage statistics. #[must_use] pub fn stats(&self) -> StorageStats { - self.stats.read().clone() + let mut stats = self.stats.read().clone(); + stats.current_chunks = self.current_chunks.load(Ordering::SeqCst); + stats } /// Get the path for a chunk. @@ -298,6 +348,24 @@ impl DiskStorage { pub fn root_dir(&self) -> &Path { &self.config.root_dir } + + fn count_existing_chunks(dir: &Path) -> std::io::Result { + if !dir.exists() { + return Ok(0); + } + + let mut count = 0u64; + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + count += Self::count_existing_chunks(&path)?; + } else if matches!(path.extension().and_then(|ext| ext.to_str()), Some("chunk")) { + count += 1; + } + } + Ok(count) + } } #[cfg(test)] @@ -489,5 +557,31 @@ mod tests { content1.len() as u64 + content2.len() as u64 ); assert_eq!(stats.bytes_retrieved, content1.len() as u64); + assert_eq!(stats.current_chunks, 2); + } + + #[tokio::test] + async fn test_capacity_recovers_after_delete() { + let temp_dir = TempDir::new().expect("create temp dir"); + let config = DiskStorageConfig { + root_dir: temp_dir.path().to_path_buf(), + verify_on_read: true, + max_chunks: 1, + }; + let storage = DiskStorage::new(config).await.expect("create storage"); + + let first = b"first chunk"; + let second = b"second chunk"; + let addr1 = DiskStorage::compute_address(first); + let addr2 = DiskStorage::compute_address(second); + + storage.put(&addr1, first).await.expect("put first"); + storage.delete(&addr1).await.expect("delete first"); + + // Should succeed because delete freed capacity. + storage.put(&addr2, second).await.expect("put second"); + + let stats = storage.stats(); + assert_eq!(stats.current_chunks, 1); } } diff --git a/src/storage/handler.rs b/src/storage/handler.rs index e13da869..d6758423 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -227,8 +227,13 @@ impl AntProtocol { request.data_size ); - // Validate data size - data_size is u64, cast carefully - let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); + // Validate data size - data_size is u64, cast carefully and reject overflow + let Ok(data_size_usize) = usize::try_from(request.data_size) else { + return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { + size: MAX_CHUNK_SIZE + 1, + max_size: MAX_CHUNK_SIZE, + }); + }; if data_size_usize > MAX_CHUNK_SIZE { return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { size: data_size_usize, diff --git a/tests/e2e/integration_tests.rs b/tests/e2e/integration_tests.rs index 8105ba0a..62c1132e 100644 --- a/tests/e2e/integration_tests.rs +++ b/tests/e2e/integration_tests.rs @@ -237,20 +237,21 @@ async fn test_node_to_node_messaging() { // Race all receivers concurrently instead of polling sequentially. // Pin the deadline sleep once so it tracks cumulative time across loop // iterations — otherwise select_all always wins the race against a - // freshly-created sleep and the timeout never fires. + // freshly-created sleep and the timeout never fires. Pinning prevents the + // timeout from being recreated on each loop iteration, which would reset + // its deadline. let timeout = tokio::time::sleep_until(deadline); tokio::pin!(timeout); while !received { let futs: Vec<_> = all_rx .iter_mut() - .enumerate() - .map(|(i, rx)| Box::pin(async move { (i, rx.recv().await) })) + .map(|rx| Box::pin(async move { rx.recv().await })) .collect(); tokio::select! { - (result, idx, _remaining) = futures::future::select_all(futs) => { - if let (_, Ok(P2PEvent::Message { ref topic, ref data, .. })) = (idx, &result.1) { + (result, _idx, _remaining) = futures::future::select_all(futs) => { + if let Ok(P2PEvent::Message { ref topic, ref data, .. }) = result { if topic == TEST_TOPIC && data.as_slice() == PAYLOAD { received = true; } diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index f4c96810..209c6c70 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -5,10 +5,10 @@ //! //! ## Protocol-Based Testing //! -//! Each test node includes a `AntProtocol` handler that processes chunk +//! Each test node includes an `AntProtocol` handler that processes chunk //! PUT/GET requests using the autonomi protocol messages. This allows E2E //! tests to validate the complete protocol flow including: -//! - Message encoding/decoding (bincode serialization) +//! - Message encoding/decoding (postcard serialization) //! - Content address verification //! - Payment verification (when enabled) //! - Disk storage persistence @@ -314,7 +314,7 @@ pub struct TestNode { /// Reference to the running P2P node. pub p2p_node: Option>, - /// ANT protocol handler for processing chunk PUT/GET requests. + /// ANT protocol handler (`AntProtocol`) for processing chunk PUT/GET requests. pub ant_protocol: Option>, /// Is this a bootstrap node? @@ -326,7 +326,10 @@ pub struct TestNode { /// Bootstrap addresses this node connects to. pub bootstrap_addrs: Vec, - /// Protocol handler background task. + /// Protocol handler background task handle. + /// + /// Populated once the node starts and the protocol router is spawned. + /// Dropped (and aborted) during teardown so tests don't leave tasks behind. protocol_task: Option>, }