diff --git a/Cargo.toml b/Cargo.toml index a12ac7e9..44b27b91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,10 @@ path = "src/bin/keygen.rs" [dependencies] # Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = { version = "0.10.0", default-features = false } +saorsa-core = "0.10.2" saorsa-pqc = "0.4.0" # Payment verification - autonomi network lookup + EVM payment -autonomi = "0.9" ant-evm = "0.1.19" evmlib = "0.4.7" xor_name = "5" @@ -79,12 +78,14 @@ tempfile = "3" flate2 = "1" tar = "0.4" +# Protocol serialization +bincode = "1" + [dev-dependencies] tokio-test = "0.4" proptest = "1" serde_json = "1" rand = "0.8" -bincode = "1" # E2E test infrastructure [[test]] diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs new file mode 100644 index 00000000..b8abf42c --- /dev/null +++ b/src/ant_protocol/chunk.rs @@ -0,0 +1,451 @@ +//! Chunk message types for the ANT protocol. +//! +//! Chunks are immutable, content-addressed data blocks where the address +//! is the SHA256 hash of the content. Maximum size is 4MB. +//! +//! This module defines the wire protocol messages for chunk operations +//! using bincode serialization for compact, fast encoding. + +use bincode::Options; +use serde::{Deserialize, Serialize}; + +/// Protocol identifier for chunk operations. +pub const CHUNK_PROTOCOL_ID: &str = "saorsa/ant/chunk/v1"; + +/// Current protocol version. +pub const PROTOCOL_VERSION: u16 = 1; + +/// Maximum chunk size in bytes (4MB). +pub const MAX_CHUNK_SIZE: usize = 4 * 1024 * 1024; + +/// Maximum wire message size for deserialization. +/// +/// Set to `MAX_CHUNK_SIZE` + 1 MB headroom for the envelope (`request_id`, +/// enum discriminants, address, payment proof, etc.). This prevents a +/// malicious peer from sending a length-prefixed `Vec` that causes an +/// unbounded allocation. +const MAX_WIRE_MESSAGE_SIZE: u64 = (MAX_CHUNK_SIZE + 1024 * 1024) as u64; + +/// Data type identifier for chunks. +pub const DATA_TYPE_CHUNK: u32 = 0; + +/// Content-addressed identifier (32 bytes). +pub type XorName = [u8; 32]; + +/// Enum of all chunk protocol message types. +/// +/// Uses a single-byte discriminant for efficient wire encoding. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChunkMessageBody { + /// Request to store a chunk. + PutRequest(ChunkPutRequest), + /// Response to a PUT request. + PutResponse(ChunkPutResponse), + /// Request to retrieve a chunk. + GetRequest(ChunkGetRequest), + /// Response to a GET request. + GetResponse(ChunkGetResponse), + /// Request a storage quote. + QuoteRequest(ChunkQuoteRequest), + /// Response with a storage quote. + QuoteResponse(ChunkQuoteResponse), +} + +/// Wire-format wrapper that pairs a sender-assigned `request_id` with +/// a [`ChunkMessageBody`]. +/// +/// The sender picks a unique `request_id`; the handler echoes it back +/// in the response so callers can correlate replies by ID rather than +/// by source peer. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkMessage { + /// Sender-assigned identifier, echoed back in the response. + pub request_id: u64, + /// The protocol message body. + pub body: ChunkMessageBody, +} + +impl ChunkMessage { + /// Encode the message to bytes using bincode. + /// + /// # Errors + /// + /// Returns an error if serialization fails. + pub fn encode(&self) -> Result, ProtocolError> { + bincode::DefaultOptions::new() + .with_limit(MAX_WIRE_MESSAGE_SIZE) + .allow_trailing_bytes() + .serialize(self) + .map_err(|e| ProtocolError::SerializationFailed(e.to_string())) + } + + /// Decode a message from bytes using bincode. + /// + /// # Errors + /// + /// Returns an error if deserialization fails. + pub fn decode(data: &[u8]) -> Result { + bincode::DefaultOptions::new() + .with_limit(MAX_WIRE_MESSAGE_SIZE) + .allow_trailing_bytes() + .deserialize(data) + .map_err(|e| ProtocolError::DeserializationFailed(e.to_string())) + } +} + +// ============================================================================= +// PUT Request/Response +// ============================================================================= + +/// Request to store a chunk. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkPutRequest { + /// The content-addressed identifier (SHA256 of content). + pub address: XorName, + /// The chunk data. + pub content: Vec, + /// Optional payment proof (serialized `ProofOfPayment`). + /// Required for new chunks unless already verified. + pub payment_proof: Option>, +} + +impl ChunkPutRequest { + /// Create a new PUT request. + #[must_use] + pub fn new(address: XorName, content: Vec) -> Self { + Self { + address, + content, + payment_proof: None, + } + } + + /// Create a new PUT request with payment proof. + #[must_use] + pub fn with_payment(address: XorName, content: Vec, payment_proof: Vec) -> Self { + Self { + address, + content, + payment_proof: Some(payment_proof), + } + } +} + +/// Response to a PUT request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChunkPutResponse { + /// Chunk stored successfully. + Success { + /// The address where the chunk was stored. + address: XorName, + }, + /// Chunk already exists (idempotent success). + AlreadyExists { + /// The existing chunk address. + address: XorName, + }, + /// Payment is required to store this chunk. + PaymentRequired { + /// Error message. + message: String, + }, + /// An error occurred. + Error(ProtocolError), +} + +// ============================================================================= +// GET Request/Response +// ============================================================================= + +/// Request to retrieve a chunk. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkGetRequest { + /// The content-addressed identifier to retrieve. + pub address: XorName, +} + +impl ChunkGetRequest { + /// Create a new GET request. + #[must_use] + pub fn new(address: XorName) -> Self { + Self { address } + } +} + +/// Response to a GET request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChunkGetResponse { + /// Chunk found and returned. + Success { + /// The chunk address. + address: XorName, + /// The chunk data. + content: Vec, + }, + /// Chunk not found. + NotFound { + /// The requested address. + address: XorName, + }, + /// An error occurred. + Error(ProtocolError), +} + +// ============================================================================= +// Quote Request/Response +// ============================================================================= + +/// Request a storage quote for a chunk. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkQuoteRequest { + /// The content address of the data to store. + pub address: XorName, + /// Size of the data in bytes. + pub data_size: u64, + /// Data type identifier (0 for chunks). + pub data_type: u32, +} + +impl ChunkQuoteRequest { + /// Create a new quote request. + #[must_use] + pub fn new(address: XorName, data_size: u64) -> Self { + Self { + address, + data_size, + data_type: DATA_TYPE_CHUNK, + } + } +} + +/// Response with a storage quote. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChunkQuoteResponse { + /// Quote generated successfully. + Success { + /// Serialized `PaymentQuote`. + quote: Vec, + }, + /// Quote generation failed. + Error(ProtocolError), +} + +// ============================================================================= +// Protocol Errors +// ============================================================================= + +/// Errors that can occur during protocol operations. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ProtocolError { + /// Message serialization failed. + SerializationFailed(String), + /// Message deserialization failed. + DeserializationFailed(String), + /// Chunk exceeds maximum size. + ChunkTooLarge { + /// Size of the chunk in bytes. + size: usize, + /// Maximum allowed size. + max_size: usize, + }, + /// Content address mismatch (hash(content) != address). + AddressMismatch { + /// Expected address. + expected: XorName, + /// Actual address computed from content. + actual: XorName, + }, + /// Storage operation failed. + StorageFailed(String), + /// Payment verification failed. + PaymentFailed(String), + /// Quote generation failed. + QuoteFailed(String), + /// Internal error. + Internal(String), +} + +impl std::fmt::Display for ProtocolError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SerializationFailed(msg) => write!(f, "serialization failed: {msg}"), + Self::DeserializationFailed(msg) => write!(f, "deserialization failed: {msg}"), + Self::ChunkTooLarge { size, max_size } => { + write!(f, "chunk size {size} exceeds maximum {max_size}") + } + Self::AddressMismatch { expected, actual } => { + write!( + f, + "address mismatch: expected {}, got {}", + hex::encode(expected), + hex::encode(actual) + ) + } + Self::StorageFailed(msg) => write!(f, "storage failed: {msg}"), + Self::PaymentFailed(msg) => write!(f, "payment failed: {msg}"), + Self::QuoteFailed(msg) => write!(f, "quote failed: {msg}"), + Self::Internal(msg) => write!(f, "internal error: {msg}"), + } + } +} + +impl std::error::Error for ProtocolError {} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +mod tests { + use super::*; + + #[test] + fn test_put_request_encode_decode() { + let address = [0xAB; 32]; + let content = vec![1, 2, 3, 4, 5]; + let request = ChunkPutRequest::new(address, content.clone()); + let msg = ChunkMessage { + request_id: 42, + body: ChunkMessageBody::PutRequest(request), + }; + + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 42); + if let ChunkMessageBody::PutRequest(req) = decoded.body { + assert_eq!(req.address, address); + assert_eq!(req.content, content); + assert!(req.payment_proof.is_none()); + } else { + panic!("expected PutRequest"); + } + } + + #[test] + fn test_put_request_with_payment() { + let address = [0xAB; 32]; + let content = vec![1, 2, 3, 4, 5]; + let payment = vec![10, 20, 30]; + let request = ChunkPutRequest::with_payment(address, content.clone(), payment.clone()); + + assert_eq!(request.address, address); + assert_eq!(request.content, content); + assert_eq!(request.payment_proof, Some(payment)); + } + + #[test] + fn test_get_request_encode_decode() { + let address = [0xCD; 32]; + let request = ChunkGetRequest::new(address); + let msg = ChunkMessage { + request_id: 7, + body: ChunkMessageBody::GetRequest(request), + }; + + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 7); + if let ChunkMessageBody::GetRequest(req) = decoded.body { + assert_eq!(req.address, address); + } else { + panic!("expected GetRequest"); + } + } + + #[test] + fn test_put_response_success() { + let address = [0xEF; 32]; + let response = ChunkPutResponse::Success { address }; + let msg = ChunkMessage { + request_id: 99, + body: ChunkMessageBody::PutResponse(response), + }; + + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 99); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) = + decoded.body + { + assert_eq!(addr, address); + } else { + panic!("expected PutResponse::Success"); + } + } + + #[test] + fn test_get_response_not_found() { + let address = [0x12; 32]; + let response = ChunkGetResponse::NotFound { address }; + let msg = ChunkMessage { + request_id: 0, + body: ChunkMessageBody::GetResponse(response), + }; + + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 0); + if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) = + decoded.body + { + assert_eq!(addr, address); + } else { + panic!("expected GetResponse::NotFound"); + } + } + + #[test] + fn test_quote_request_encode_decode() { + let address = [0x34; 32]; + let request = ChunkQuoteRequest::new(address, 1024); + let msg = ChunkMessage { + request_id: 1, + body: ChunkMessageBody::QuoteRequest(request), + }; + + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ChunkMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 1); + if let ChunkMessageBody::QuoteRequest(req) = decoded.body { + assert_eq!(req.address, address); + assert_eq!(req.data_size, 1024); + assert_eq!(req.data_type, DATA_TYPE_CHUNK); + } else { + panic!("expected QuoteRequest"); + } + } + + #[test] + fn test_protocol_error_display() { + let err = ProtocolError::ChunkTooLarge { + size: 5_000_000, + max_size: MAX_CHUNK_SIZE, + }; + assert!(err.to_string().contains("5000000")); + assert!(err.to_string().contains(&MAX_CHUNK_SIZE.to_string())); + + let err = ProtocolError::AddressMismatch { + expected: [0xAA; 32], + actual: [0xBB; 32], + }; + let display = err.to_string(); + assert!(display.contains("address mismatch")); + } + + #[test] + fn test_invalid_decode() { + let invalid_data = vec![0xFF, 0xFF, 0xFF]; + let result = ChunkMessage::decode(&invalid_data); + assert!(result.is_err()); + } + + #[test] + fn test_constants() { + assert_eq!(CHUNK_PROTOCOL_ID, "saorsa/ant/chunk/v1"); + assert_eq!(PROTOCOL_VERSION, 1); + assert_eq!(MAX_CHUNK_SIZE, 4 * 1024 * 1024); + assert_eq!(DATA_TYPE_CHUNK, 0); + } +} diff --git a/src/ant_protocol/mod.rs b/src/ant_protocol/mod.rs new file mode 100644 index 00000000..5f86ec4c --- /dev/null +++ b/src/ant_protocol/mod.rs @@ -0,0 +1,56 @@ +//! ANT protocol implementation for the saorsa network. +//! +//! This module implements the wire protocol for storing and retrieving +//! data on the saorsa network, compatible with the autonomi network protocol. +//! +//! # Data Types +//! +//! The ANT protocol supports multiple data types: +//! +//! - **Chunk**: Immutable, content-addressed data (hash == address) +//! - *Scratchpad*: Mutable, owner-indexed data (planned) +//! - *Pointer*: Lightweight mutable references (planned) +//! - *`GraphEntry`*: DAG entries with parent links (planned) +//! +//! # Protocol Overview +//! +//! The protocol uses bincode serialization for compact, fast encoding. +//! Each data type has its own message types for PUT/GET operations. +//! +//! ## Chunk Messages +//! +//! - `ChunkPutRequest` / `ChunkPutResponse` - Store chunks +//! - `ChunkGetRequest` / `ChunkGetResponse` - Retrieve chunks +//! - `ChunkQuoteRequest` / `ChunkQuoteResponse` - Request storage quotes +//! +//! ## Payment Flow +//! +//! 1. Client requests a quote via `ChunkQuoteRequest` +//! 2. Node returns signed `PaymentQuote` in `ChunkQuoteResponse` +//! 3. Client pays on Arbitrum via `PaymentVault.payForQuotes()` +//! 4. Client sends `ChunkPutRequest` with `payment_proof` +//! 5. Node verifies payment and stores chunk +//! +//! # Example +//! +//! ```rust,ignore +//! use saorsa_node::ant_protocol::{ChunkMessage, ChunkPutRequest, ChunkGetRequest}; +//! +//! // Create a PUT request +//! let address = compute_address(&data); +//! let request = ChunkPutRequest::with_payment(address, data, payment_proof); +//! let message = ChunkMessage::PutRequest(request); +//! let bytes = message.encode()?; +//! +//! // Decode a response +//! let response = ChunkMessage::decode(&response_bytes)?; +//! ``` + +pub mod chunk; + +// Re-export chunk types for convenience +pub use chunk::{ + ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, + ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, ProtocolError, XorName, + CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, PROTOCOL_VERSION, +}; diff --git a/src/client/chunk_protocol.rs b/src/client/chunk_protocol.rs new file mode 100644 index 00000000..f97d2399 --- /dev/null +++ b/src/client/chunk_protocol.rs @@ -0,0 +1,78 @@ +//! Shared helper for the chunk protocol request/response pattern. +//! +//! Extracts the duplicated "subscribe → send → poll event loop" into a single +//! generic function used by both [`super::QuantumClient`] and E2E test helpers. + +use crate::ant_protocol::{ChunkMessage, ChunkMessageBody, CHUNK_PROTOCOL_ID}; +use saorsa_core::{P2PEvent, P2PNode}; +use std::time::Duration; +use tokio::time::Instant; +use tracing::warn; + +/// Send a chunk-protocol message to `target_peer` and await a matching response. +/// +/// The event loop filters by topic (`CHUNK_PROTOCOL_ID`), source peer, decode +/// errors (warn + skip), and `request_id` mismatch (skip). +/// +/// * `response_handler` — inspects the decoded [`ChunkMessageBody`] and returns: +/// - `Some(Ok(T))` to resolve successfully, +/// - `Some(Err(E))` to resolve with an error, +/// - `None` to keep waiting (wrong variant / not our response). +/// * `send_error` — produces the caller's error type when `send_message` fails. +/// * `timeout_error` — produces the caller's error type on deadline expiry. +/// +/// # Errors +/// +/// Returns `Err(E)` if sending fails (via `send_error`), the `response_handler` +/// returns a protocol-level error, or the deadline expires (via `timeout_error`). +#[allow(clippy::too_many_arguments)] +pub async fn send_and_await_chunk_response( + node: &P2PNode, + target_peer: &str, + message_bytes: Vec, + request_id: u64, + timeout: Duration, + response_handler: impl Fn(ChunkMessageBody) -> Option>, + send_error: impl FnOnce(String) -> E, + timeout_error: impl FnOnce() -> E, +) -> Result { + // Subscribe before sending so we don't miss the response + let mut events = node.subscribe_events(); + + let target_peer_id = target_peer.to_string(); + + node.send_message(&target_peer_id, CHUNK_PROTOCOL_ID, message_bytes) + .await + .map_err(|e| send_error(e.to_string()))?; + + let deadline = Instant::now() + timeout; + + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + match tokio::time::timeout(remaining, events.recv()).await { + Ok(Ok(P2PEvent::Message { + topic, + source, + data, + })) if topic == CHUNK_PROTOCOL_ID && source == target_peer_id => { + let response = match ChunkMessage::decode(&data) { + Ok(r) => r, + Err(e) => { + warn!("Failed to decode chunk message, skipping: {e}"); + continue; + } + }; + if response.request_id != request_id { + continue; + } + if let Some(result) = response_handler(response.body) { + return result; + } + } + Ok(Ok(_)) => {} + Ok(Err(_)) | Err(_) => break, + } + } + + Err(timeout_error()) +} diff --git a/src/client/data_types.rs b/src/client/data_types.rs index f8d443e5..413fca35 100644 --- a/src/client/data_types.rs +++ b/src/client/data_types.rs @@ -5,6 +5,18 @@ //! the address is the SHA256 hash of the content. use bytes::Bytes; +use sha2::{Digest, Sha256}; + +/// Compute the content address (SHA256 hash) for the given data. +#[must_use] +pub fn compute_address(content: &[u8]) -> XorName { + let mut hasher = Sha256::new(); + hasher.update(content); + let result = hasher.finalize(); + let mut address = [0u8; 32]; + address.copy_from_slice(&result); + address +} /// A content-addressed identifier (32 bytes). /// @@ -39,12 +51,7 @@ impl DataChunk { /// Create a chunk from content, computing the address automatically. #[must_use] pub fn from_content(content: Bytes) -> Self { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(&content); - let result = hasher.finalize(); - let mut address = [0u8; 32]; - address.copy_from_slice(&result); + let address = compute_address(&content); Self { address, content } } @@ -57,11 +64,7 @@ impl DataChunk { /// Verify that the address matches SHA256(content). #[must_use] pub fn verify(&self) -> bool { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(&self.content); - let result = hasher.finalize(); - self.address == result.as_slice() + self.address == compute_address(&self.content) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index a5931b50..90350e29 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -53,8 +53,10 @@ //! - **ML-DSA-65** (NIST FIPS 204): Digital signatures for authentication //! - **ChaCha20-Poly1305**: Symmetric encryption for data at rest +mod chunk_protocol; mod data_types; mod quantum; -pub use data_types::{ChunkStats, DataChunk, XorName}; +pub use chunk_protocol::send_and_await_chunk_response; +pub use data_types::{compute_address, ChunkStats, DataChunk, XorName}; pub use quantum::{QuantumClient, QuantumConfig}; diff --git a/src/client/quantum.rs b/src/client/quantum.rs index ed94af92..3b462381 100644 --- a/src/client/quantum.rs +++ b/src/client/quantum.rs @@ -16,13 +16,26 @@ //! - **ML-DSA-65**: NIST FIPS 204 compliant signatures for authentication //! - **ChaCha20-Poly1305**: Symmetric encryption for data at rest +use super::chunk_protocol::send_and_await_chunk_response; use super::data_types::{DataChunk, XorName}; +use crate::ant_protocol::{ + ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, + ChunkPutResponse, +}; use crate::error::{Error, Result}; use bytes::Bytes; use saorsa_core::P2PNode; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; use tracing::{debug, info}; +/// Default timeout for network operations in seconds. +const DEFAULT_TIMEOUT_SECS: u64 = 30; + +/// Default number of replicas for data redundancy. +const DEFAULT_REPLICA_COUNT: u8 = 4; + /// Configuration for the quantum-resistant client. #[derive(Debug, Clone)] pub struct QuantumConfig { @@ -37,8 +50,8 @@ pub struct QuantumConfig { impl Default for QuantumConfig { fn default() -> Self { Self { - timeout_secs: 30, - replica_count: 4, + timeout_secs: DEFAULT_TIMEOUT_SECS, + replica_count: DEFAULT_REPLICA_COUNT, encrypt_data: true, } } @@ -59,6 +72,7 @@ impl Default for QuantumConfig { pub struct QuantumClient { config: QuantumConfig, p2p_node: Option>, + next_request_id: AtomicU64, } impl QuantumClient { @@ -69,6 +83,7 @@ impl QuantumClient { Self { config, p2p_node: None, + next_request_id: AtomicU64::new(1), } } @@ -85,7 +100,10 @@ impl QuantumClient { self } - /// Get a chunk from the saorsa network. + /// Get a chunk from the saorsa network via ANT protocol. + /// + /// Sends a `ChunkGetRequest` to a connected peer and waits for the + /// `ChunkGetResponse`. /// /// # Arguments /// @@ -108,34 +126,65 @@ impl QuantumClient { return Err(Error::Network("P2P node not configured".into())); }; - let _ = self.config.timeout_secs; // Use config for future timeout implementation - - // Lookup chunk in DHT - match node.dht_get(*address).await { - Ok(Some(data)) => { - debug!( - "Found chunk {} on saorsa network ({} bytes)", - hex::encode(address), - data.len() - ); - Ok(Some(DataChunk::new(*address, Bytes::from(data)))) - } - Ok(None) => { - debug!("Chunk {} not found on saorsa network", hex::encode(address)); - Ok(None) - } - Err(e) => Err(Error::Network(format!( - "DHT lookup failed for {}: {}", - hex::encode(address), - e - ))), - } + let target_peer = Self::pick_target_peer(node).await?; + + // Create and send GET request + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + let request = ChunkGetRequest::new(*address); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::GetRequest(request), + }; + let message_bytes = message + .encode() + .map_err(|e| Error::Network(format!("Failed to encode GET request: {e}")))?; + + let timeout = Duration::from_secs(self.config.timeout_secs); + let addr_hex = hex::encode(address); + let timeout_secs = self.config.timeout_secs; + + send_and_await_chunk_response( + node, + &target_peer, + message_bytes, + request_id, + timeout, + |body| match body { + ChunkMessageBody::GetResponse(ChunkGetResponse::Success { + address: addr, + content, + }) => { + debug!( + "Found chunk {} on saorsa network ({} bytes)", + hex::encode(addr), + content.len() + ); + Some(Ok(Some(DataChunk::new(addr, Bytes::from(content))))) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => { + debug!("Chunk {} not found on saorsa network", addr_hex); + Some(Ok(None)) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err( + Error::Network(format!("Remote GET error for {addr_hex}: {e}")), + )), + _ => None, + }, + |e| Error::Network(format!("Failed to send GET to peer {target_peer}: {e}")), + || { + Error::Network(format!( + "Timeout waiting for chunk {addr_hex} after {timeout_secs}s" + )) + }, + ) + .await } - /// Store a chunk on the saorsa network. + /// Store a chunk on the saorsa network via ANT protocol. /// /// The chunk address is computed as SHA256(content), ensuring content-addressing. - /// The `P2PNode` handles ML-DSA-65 signing internally. + /// Sends a `ChunkPutRequest` to a connected peer and waits for the + /// `ChunkPutResponse`. /// /// # Arguments /// @@ -149,43 +198,82 @@ impl QuantumClient { /// /// Returns an error if the store operation fails. pub async fn put_chunk(&self, content: Bytes) -> Result { - use sha2::{Digest, Sha256}; - debug!("Storing chunk on saorsa network ({} bytes)", content.len()); let Some(ref node) = self.p2p_node else { return Err(Error::Network("P2P node not configured".into())); }; + let target_peer = Self::pick_target_peer(node).await?; + // Compute content address using SHA-256 - let mut hasher = Sha256::new(); - hasher.update(&content); - let hash = hasher.finalize(); - - let mut address = [0u8; 32]; - address.copy_from_slice(&hash); - - let _ = self.config.replica_count; // Used for future replication verification - - // Store in DHT - P2PNode handles ML-DSA-65 signing internally - node.dht_put(address, content.to_vec()).await.map_err(|e| { - Error::Network(format!( - "DHT store failed for {}: {}", - hex::encode(address), - e - )) - })?; - - info!( - "Chunk stored at address: {} ({} bytes)", - hex::encode(address), - content.len() - ); - Ok(address) + let address = crate::client::compute_address(&content); + + // Create PUT request with empty payment proof + let empty_payment = rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .map_err(|e| Error::Network(format!("Failed to serialize payment proof: {e}")))?; + + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + let request = ChunkPutRequest::with_payment(address, content.to_vec(), empty_payment); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::PutRequest(request), + }; + let message_bytes = message + .encode() + .map_err(|e| Error::Network(format!("Failed to encode PUT request: {e}")))?; + + let timeout = Duration::from_secs(self.config.timeout_secs); + let content_len = content.len(); + let addr_hex = hex::encode(address); + let timeout_secs = self.config.timeout_secs; + + send_and_await_chunk_response( + node, + &target_peer, + message_bytes, + request_id, + timeout, + |body| match body { + ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => { + info!( + "Chunk stored at address: {} ({} bytes)", + hex::encode(addr), + content_len + ); + Some(Ok(addr)) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { + address: addr, + }) => { + info!("Chunk already exists at address: {}", hex::encode(addr)); + Some(Ok(addr)) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => { + Some(Err(Error::Network(format!("Payment required: {message}")))) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err( + Error::Network(format!("Remote PUT error for {addr_hex}: {e}")), + )), + _ => None, + }, + |e| Error::Network(format!("Failed to send PUT to peer {target_peer}: {e}")), + || { + Error::Network(format!( + "Timeout waiting for store response for {addr_hex} after {timeout_secs}s" + )) + }, + ) + .await } /// Check if a chunk exists on the saorsa network. /// + /// Implemented via `get_chunk` — returns `Ok(true)` on success, + /// `Ok(false)` if not found. + /// /// # Arguments /// /// * `address` - The `XorName` to check @@ -202,27 +290,16 @@ impl QuantumClient { "Checking existence on saorsa network: {}", hex::encode(address) ); + self.get_chunk(address).await.map(|opt| opt.is_some()) + } - let Some(ref node) = self.p2p_node else { - return Err(Error::Network("P2P node not configured".into())); - }; - - // Check if data exists in DHT - match node.dht_get(*address).await { - Ok(Some(_)) => { - debug!("Chunk {} exists on saorsa network", hex::encode(address)); - Ok(true) - } - Ok(None) => { - debug!("Chunk {} not found on saorsa network", hex::encode(address)); - Ok(false) - } - Err(e) => Err(Error::Network(format!( - "DHT lookup failed for {}: {}", - hex::encode(address), - e - ))), - } + /// Pick a target peer from the connected peers list. + async fn pick_target_peer(node: &P2PNode) -> Result { + let peers = node.connected_peers().await; + peers + .into_iter() + .next() + .ok_or_else(|| Error::Network("No connected peers available".into())) } } @@ -234,15 +311,15 @@ mod tests { #[test] fn test_quantum_config_default() { let config = QuantumConfig::default(); - assert_eq!(config.timeout_secs, 30); - assert_eq!(config.replica_count, 4); + assert_eq!(config.timeout_secs, DEFAULT_TIMEOUT_SECS); + assert_eq!(config.replica_count, DEFAULT_REPLICA_COUNT); assert!(config.encrypt_data); } #[test] fn test_quantum_client_creation() { let client = QuantumClient::with_defaults(); - assert_eq!(client.config.timeout_secs, 30); + assert_eq!(client.config.timeout_secs, DEFAULT_TIMEOUT_SECS); assert!(client.p2p_node.is_none()); } diff --git a/src/config.rs b/src/config.rs index b7458a2b..83bf7c23 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,7 @@ //! Configuration for saorsa-node. use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; /// IP version configuration. @@ -130,6 +130,10 @@ pub struct NodeConfig { #[serde(default)] pub bootstrap_cache: BootstrapCacheConfig, + /// Storage configuration for chunk persistence. + #[serde(default)] + pub storage: StorageConfig, + /// Log level. #[serde(default = "default_log_level")] pub log_level: String, @@ -364,6 +368,7 @@ impl Default for NodeConfig { payment: PaymentConfig::default(), attestation: AttestationNodeConfig::default(), bootstrap_cache: BootstrapCacheConfig::default(), + storage: StorageConfig::default(), log_level: default_log_level(), } } @@ -523,14 +528,58 @@ const fn default_bootstrap_stale_days() -> u64 { 7 } +// ============================================================================ +// Storage Configuration +// ============================================================================ + +/// Storage configuration for chunk persistence. +/// +/// Controls how chunks are stored on disk, including: +/// - Whether storage is enabled +/// - Maximum chunks to store (for capacity management) +/// - Content verification on read +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StorageConfig { + /// Enable chunk storage. + /// Default: true + #[serde(default = "default_storage_enabled")] + pub enabled: bool, + + /// Maximum number of chunks to store (0 = unlimited). + /// Default: 0 (unlimited) + #[serde(default)] + pub max_chunks: usize, + + /// Verify content hash matches address on read. + /// Default: true + #[serde(default = "default_storage_verify_on_read")] + pub verify_on_read: bool, +} + +impl Default for StorageConfig { + fn default() -> Self { + Self { + enabled: default_storage_enabled(), + max_chunks: 0, + verify_on_read: default_storage_verify_on_read(), + } + } +} + +const fn default_storage_enabled() -> bool { + true +} + +const fn default_storage_verify_on_read() -> bool { + true +} + /// Default testnet bootstrap nodes. /// /// These are well-known bootstrap nodes for the Saorsa testnet. /// - saorsa-bootstrap-1 (NYC): 165.22.4.178:12000 /// - saorsa-bootstrap-2 (SFO): 164.92.111.156:12000 fn default_testnet_bootstrap() -> Vec { - use std::net::{Ipv4Addr, SocketAddrV4}; - vec![ // saorsa-bootstrap-1 (Digital Ocean NYC1) SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(165, 22, 4, 178), 12000)), diff --git a/src/error.rs b/src/error.rs index 85ec9a5d..763ab7b9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -44,6 +44,14 @@ pub enum Error { #[error("serialization error: {0}")] Serialization(String), + /// Protocol error. + #[error("protocol error: {0}")] + Protocol(String), + + /// Invalid chunk error. + #[error("invalid chunk: {0}")] + InvalidChunk(String), + /// Node is shutting down. #[error("node is shutting down")] ShuttingDown, diff --git a/src/lib.rs b/src/lib.rs index 30142ede..85ed8c22 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ #![warn(clippy::all)] #![warn(clippy::pedantic)] +pub mod ant_protocol; pub mod attestation; pub mod client; pub mod config; @@ -48,11 +49,17 @@ pub mod node; pub mod payment; #[cfg(test)] mod probe; +pub mod storage; pub mod upgrade; -pub use client::{DataChunk, QuantumClient, QuantumConfig, XorName}; -pub use config::{BootstrapCacheConfig, NodeConfig}; +pub use ant_protocol::{ + ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, + ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE, +}; +pub use client::{compute_address, DataChunk, QuantumClient, QuantumConfig, XorName}; +pub use config::{BootstrapCacheConfig, NodeConfig, StorageConfig}; pub use error::{Error, Result}; pub use event::{NodeEvent, NodeEventsChannel}; pub use node::{NodeBuilder, RunningNode}; pub use payment::{PaymentStatus, PaymentVerifier, PaymentVerifierConfig}; +pub use storage::{AntProtocol, DiskStorage, DiskStorageConfig}; diff --git a/src/node.rs b/src/node.rs index c46dd305..312e31f8 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,22 +1,38 @@ //! Node implementation - thin wrapper around saorsa-core's `P2PNode`. +use crate::ant_protocol::CHUNK_PROTOCOL_ID; use crate::attestation::VerificationLevel; -use crate::config::{AttestationMode, AttestationNodeConfig, IpVersion, NetworkMode, NodeConfig}; +use crate::config::{ + AttestationMode, AttestationNodeConfig, EvmNetworkConfig, IpVersion, NetworkMode, NodeConfig, +}; use crate::error::{Error, Result}; use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender}; +use crate::payment::metrics::QuotingMetricsTracker; +use crate::payment::wallet::parse_rewards_address; +use crate::payment::{PaymentVerifier, PaymentVerifierConfig, QuoteGenerator}; +use crate::storage::{AntProtocol, DiskStorage, DiskStorageConfig}; use crate::upgrade::{AutoApplyUpgrader, UpgradeMonitor, UpgradeResult}; +use ant_evm::RewardsAddress; +use evmlib::Network as EvmNetwork; use saorsa_core::{ AttestationConfig as CoreAttestationConfig, BootstrapConfig as CoreBootstrapConfig, BootstrapManager, EnforcementMode as CoreEnforcementMode, - IPDiversityConfig as CoreDiversityConfig, NodeConfig as CoreNodeConfig, P2PNode, + IPDiversityConfig as CoreDiversityConfig, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, ProductionConfig as CoreProductionConfig, }; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch; +use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; +/// Maximum number of records for quoting metrics. +const DEFAULT_MAX_QUOTING_RECORDS: usize = 100_000; + +/// Default rewards address when none is configured (20-byte zero address). +const DEFAULT_REWARDS_ADDRESS: [u8; 20] = [0u8; 20]; + #[cfg(unix)] use tokio::signal::unix::{signal, SignalKind}; @@ -78,6 +94,14 @@ impl NodeBuilder { None }; + // Initialize ANT protocol handler for chunk storage + let ant_protocol = if self.config.storage.enabled { + Some(Arc::new(Self::build_ant_protocol(&self.config).await?)) + } else { + info!("Chunk storage disabled"); + None + }; + let node = RunningNode { config: self.config, p2p_node: Arc::new(p2p_node), @@ -87,6 +111,8 @@ impl NodeBuilder { events_rx: Some(events_rx), upgrade_monitor, bootstrap_manager, + ant_protocol, + protocol_task: None, }; Ok(node) @@ -272,6 +298,54 @@ impl NodeBuilder { } } + /// Build the ANT protocol handler from config. + /// + /// Initializes disk storage, payment verifier, and quote generator. + async fn build_ant_protocol(config: &NodeConfig) -> Result { + // Create disk storage + let storage_config = DiskStorageConfig { + root_dir: config.root_dir.clone(), + verify_on_read: config.storage.verify_on_read, + max_chunks: config.storage.max_chunks, + }; + let storage = DiskStorage::new(storage_config) + .await + .map_err(|e| Error::Startup(format!("Failed to create disk storage: {e}")))?; + + // Create payment verifier + let evm_network = match config.payment.evm_network { + EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne, + EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest, + }; + let payment_config = PaymentVerifierConfig { + evm: crate::payment::EvmVerifierConfig { + enabled: config.payment.enabled, + network: evm_network, + }, + cache_capacity: config.payment.cache_capacity, + }; + let payment_verifier = PaymentVerifier::new(payment_config); + + // Create quote generator + let rewards_address = match config.payment.rewards_address { + Some(ref addr) => parse_rewards_address(addr)?, + None => RewardsAddress::new(DEFAULT_REWARDS_ADDRESS), + }; + let metrics_tracker = QuotingMetricsTracker::new(DEFAULT_MAX_QUOTING_RECORDS, 0); + let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker); + + info!( + "ANT protocol handler initialized (protocol={})", + CHUNK_PROTOCOL_ID + ); + + Ok(AntProtocol::new( + Arc::new(storage), + Arc::new(payment_verifier), + Arc::new(quote_generator), + )) + } + /// Build the bootstrap cache manager from config. async fn build_bootstrap_manager(config: &NodeConfig) -> Option { let cache_dir = config @@ -319,6 +393,10 @@ pub struct RunningNode { upgrade_monitor: Option>, /// Bootstrap cache manager for persistent peer storage. bootstrap_manager: Option, + /// ANT protocol handler for chunk storage. + ant_protocol: Option>, + /// Protocol message routing background task. + protocol_task: Option>, } impl RunningNode { @@ -365,6 +443,9 @@ impl RunningNode { warn!("Failed to send Started event: {e}"); } + // Start protocol message routing (P2P → AntProtocol → P2P response) + self.start_protocol_routing(); + // Start upgrade monitor if enabled if let Some(ref monitor) = self.upgrade_monitor { let monitor = Arc::clone(monitor); @@ -442,6 +523,11 @@ impl RunningNode { } } + // Stop protocol routing task + if let Some(handle) = self.protocol_task.take() { + handle.abort(); + } + // Shutdown P2P node info!("Shutting down P2P node..."); if let Err(e) = self.p2p_node.shutdown().await { @@ -509,6 +595,56 @@ impl RunningNode { Ok(()) } + /// Start the protocol message routing background task. + /// + /// Subscribes to P2P events and routes incoming chunk protocol messages + /// to the `AntProtocol` handler, sending responses back to the sender. + fn start_protocol_routing(&mut self) { + let protocol = match self.ant_protocol { + Some(ref p) => Arc::clone(p), + None => return, + }; + + let mut events = self.p2p_node.subscribe_events(); + let p2p = Arc::clone(&self.p2p_node); + + self.protocol_task = Some(tokio::spawn(async move { + while let Ok(event) = events.recv().await { + if let P2PEvent::Message { + topic, + source, + data, + } = event + { + if topic == CHUNK_PROTOCOL_ID { + debug!("Received chunk protocol message from {}", source); + let protocol = Arc::clone(&protocol); + let p2p = Arc::clone(&p2p); + tokio::spawn(async move { + match protocol.handle_message(&data).await { + Ok(response) => { + if let Err(e) = p2p + .send_message(&source, CHUNK_PROTOCOL_ID, response.to_vec()) + .await + { + warn!( + "Failed to send protocol response to {}: {}", + source, e + ); + } + } + Err(e) => { + warn!("Protocol handler error: {}", e); + } + } + }); + } + } + } + })); + info!("Protocol message routing started"); + } + /// Request the node to shut down. pub fn shutdown(&self) { if let Err(e) = self.shutdown_tx.send(true) { diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 9f7bae51..0f742875 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -37,8 +37,8 @@ pub mod quote; mod verifier; pub mod wallet; -pub use cache::VerifiedCache; +pub use cache::{CacheStats, VerifiedCache}; pub use metrics::QuotingMetricsTracker; pub use quote::{verify_quote_content, QuoteGenerator, XorName}; -pub use verifier::{PaymentStatus, PaymentVerifier, PaymentVerifierConfig}; +pub use verifier::{EvmVerifierConfig, PaymentStatus, PaymentVerifier, PaymentVerifierConfig}; pub use wallet::{is_valid_address, parse_rewards_address, WalletConfig}; diff --git a/src/storage/disk.rs b/src/storage/disk.rs new file mode 100644 index 00000000..a71e4513 --- /dev/null +++ b/src/storage/disk.rs @@ -0,0 +1,493 @@ +//! Content-addressed disk storage with sharded directories. +//! +//! Provides persistent storage for chunks using a two-level directory structure +//! to avoid large directory listings: +//! +//! ```text +//! {root}/chunks/{xx}/{yy}/{address}.chunk +//! ``` +//! +//! Where `xx` and `yy` are the first two bytes of the address in hex. + +use crate::ant_protocol::XorName; +use crate::error::{Error, Result}; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::io::AsyncWriteExt; +use tracing::{debug, trace, warn}; + +/// Configuration for disk storage. +#[derive(Debug, Clone)] +pub struct DiskStorageConfig { + /// Root directory for chunk storage. + pub root_dir: PathBuf, + /// Whether to verify content on read (compares hash to address). + pub verify_on_read: bool, + /// Maximum number of chunks to store (0 = unlimited). + pub max_chunks: usize, +} + +impl Default for DiskStorageConfig { + fn default() -> Self { + Self { + root_dir: PathBuf::from(".saorsa/chunks"), + verify_on_read: true, + max_chunks: 0, + } + } +} + +/// Statistics about storage operations. +#[derive(Debug, Clone, Default)] +pub struct StorageStats { + /// Total number of chunks stored. + pub chunks_stored: u64, + /// Total number of chunks retrieved. + pub chunks_retrieved: u64, + /// Total bytes stored. + pub bytes_stored: u64, + /// Total bytes retrieved. + pub bytes_retrieved: u64, + /// Number of duplicate writes (already exists). + pub duplicates: u64, + /// Number of verification failures on read. + pub verification_failures: u64, +} + +/// Content-addressed disk storage. +/// +/// Uses a sharded directory structure for efficient storage: +/// ```text +/// {root}/chunks/{xx}/{yy}/{address}.chunk +/// ``` +pub struct DiskStorage { + /// Storage configuration. + config: DiskStorageConfig, + /// Operation statistics. + stats: parking_lot::RwLock, +} + +impl DiskStorage { + /// Create a new disk storage instance. + /// + /// # Errors + /// + /// Returns an error if the root directory cannot be created. + pub async fn new(config: DiskStorageConfig) -> Result { + // Ensure root directory exists + let chunks_dir = config.root_dir.join("chunks"); + fs::create_dir_all(&chunks_dir) + .await + .map_err(|e| Error::Storage(format!("Failed to create chunks directory: {e}")))?; + + debug!("Initialized disk storage at {:?}", config.root_dir); + + Ok(Self { + config, + stats: parking_lot::RwLock::new(StorageStats::default()), + }) + } + + /// Store a chunk. + /// + /// Uses atomic write (temp file + rename) for crash safety. + /// + /// # Arguments + /// + /// * `address` - Content address (should be SHA256 of content) + /// * `content` - Chunk data + /// + /// # Returns + /// + /// Returns `true` if the chunk was newly stored, `false` if it already existed. + /// + /// # Errors + /// + /// Returns an error if the write fails or content doesn't match address. + pub async fn put(&self, address: &XorName, content: &[u8]) -> Result { + // Verify content address + let computed = Self::compute_address(content); + if computed != *address { + return Err(Error::Storage(format!( + "Content address mismatch: expected {}, computed {}", + hex::encode(address), + hex::encode(computed) + ))); + } + + let chunk_path = self.chunk_path(address); + + // Check if already exists + if chunk_path.exists() { + trace!("Chunk {} already exists", hex::encode(address)); + { + let mut stats = self.stats.write(); + stats.duplicates += 1; + } + return Ok(false); + } + + // Enforce max_chunks capacity limit (0 = unlimited) + if self.config.max_chunks > 0 { + let chunks_stored = self.stats.read().chunks_stored; + if chunks_stored >= self.config.max_chunks as u64 { + return Err(Error::Storage(format!( + "Storage capacity reached: {} chunks stored, max is {}", + chunks_stored, self.config.max_chunks + ))); + } + } + + // Ensure parent directories exist + if let Some(parent) = chunk_path.parent() { + fs::create_dir_all(parent) + .await + .map_err(|e| Error::Storage(format!("Failed to create shard directory: {e}")))?; + } + + // Atomic write: temp file + rename + let temp_path = chunk_path.with_extension("tmp"); + let mut file = fs::File::create(&temp_path) + .await + .map_err(|e| Error::Storage(format!("Failed to create temp file: {e}")))?; + + file.write_all(content) + .await + .map_err(|e| Error::Storage(format!("Failed to write chunk: {e}")))?; + + file.flush() + .await + .map_err(|e| Error::Storage(format!("Failed to flush chunk: {e}")))?; + + // Rename for atomic commit + fs::rename(&temp_path, &chunk_path) + .await + .map_err(|e| Error::Storage(format!("Failed to rename temp file: {e}")))?; + + { + let mut stats = self.stats.write(); + stats.chunks_stored += 1; + stats.bytes_stored += content.len() as u64; + } + + debug!( + "Stored chunk {} ({} bytes)", + hex::encode(address), + content.len() + ); + + Ok(true) + } + + /// Retrieve a chunk. + /// + /// # Arguments + /// + /// * `address` - Content address to retrieve + /// + /// # Returns + /// + /// Returns `Some(content)` if found, `None` if not found. + /// + /// # Errors + /// + /// Returns an error if read fails or verification fails. + pub async fn get(&self, address: &XorName) -> Result>> { + let chunk_path = self.chunk_path(address); + + if !chunk_path.exists() { + trace!("Chunk {} not found", hex::encode(address)); + return Ok(None); + } + + let content = fs::read(&chunk_path) + .await + .map_err(|e| Error::Storage(format!("Failed to read chunk: {e}")))?; + + // Verify content if configured + if self.config.verify_on_read { + let computed = Self::compute_address(&content); + if computed != *address { + { + let mut stats = self.stats.write(); + stats.verification_failures += 1; + } + warn!( + "Chunk verification failed: expected {}, computed {}", + hex::encode(address), + hex::encode(computed) + ); + return Err(Error::Storage(format!( + "Chunk verification failed for {}", + hex::encode(address) + ))); + } + } + + { + let mut stats = self.stats.write(); + stats.chunks_retrieved += 1; + stats.bytes_retrieved += content.len() as u64; + } + + debug!( + "Retrieved chunk {} ({} bytes)", + hex::encode(address), + content.len() + ); + + Ok(Some(content)) + } + + /// Check if a chunk exists. + #[must_use] + pub fn exists(&self, address: &XorName) -> bool { + self.chunk_path(address).exists() + } + + /// Delete a chunk. + /// + /// # Errors + /// + /// Returns an error if deletion fails. + pub async fn delete(&self, address: &XorName) -> Result { + let chunk_path = self.chunk_path(address); + + if !chunk_path.exists() { + return Ok(false); + } + + fs::remove_file(&chunk_path) + .await + .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?; + + debug!("Deleted chunk {}", hex::encode(address)); + + Ok(true) + } + + /// Get storage statistics. + #[must_use] + pub fn stats(&self) -> StorageStats { + self.stats.read().clone() + } + + /// Get the path for a chunk. + fn chunk_path(&self, address: &XorName) -> PathBuf { + // Two-level sharding using first two bytes + let shard1 = format!("{:02x}", address[0]); + let shard2 = format!("{:02x}", address[1]); + let filename = format!("{}.chunk", hex::encode(address)); + + self.config + .root_dir + .join("chunks") + .join(shard1) + .join(shard2) + .join(filename) + } + + /// Compute content address (SHA256 hash). + #[must_use] + pub fn compute_address(content: &[u8]) -> XorName { + crate::client::compute_address(content) + } + + /// Get the root directory. + #[must_use] + pub fn root_dir(&self) -> &Path { + &self.config.root_dir + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use tempfile::TempDir; + + async fn create_test_storage() -> (DiskStorage, TempDir) { + let temp_dir = TempDir::new().expect("create temp dir"); + let config = DiskStorageConfig { + root_dir: temp_dir.path().to_path_buf(), + verify_on_read: true, + max_chunks: 0, + }; + let storage = DiskStorage::new(config).await.expect("create storage"); + (storage, temp_dir) + } + + #[tokio::test] + async fn test_put_and_get() { + let (storage, _temp) = create_test_storage().await; + + let content = b"hello world"; + let address = DiskStorage::compute_address(content); + + // Store chunk + let is_new = storage.put(&address, content).await.expect("put"); + assert!(is_new); + + // Retrieve chunk + let retrieved = storage.get(&address).await.expect("get"); + assert_eq!(retrieved, Some(content.to_vec())); + } + + #[tokio::test] + async fn test_put_duplicate() { + let (storage, _temp) = create_test_storage().await; + + let content = b"test data"; + let address = DiskStorage::compute_address(content); + + // First store + let is_new1 = storage.put(&address, content).await.expect("put 1"); + assert!(is_new1); + + // Duplicate store + let is_new2 = storage.put(&address, content).await.expect("put 2"); + assert!(!is_new2); + + // Check stats + let stats = storage.stats(); + assert_eq!(stats.chunks_stored, 1); + assert_eq!(stats.duplicates, 1); + } + + #[tokio::test] + async fn test_get_not_found() { + let (storage, _temp) = create_test_storage().await; + + let address = [0xAB; 32]; + let result = storage.get(&address).await.expect("get"); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_exists() { + let (storage, _temp) = create_test_storage().await; + + let content = b"exists test"; + let address = DiskStorage::compute_address(content); + + assert!(!storage.exists(&address)); + + storage.put(&address, content).await.expect("put"); + + assert!(storage.exists(&address)); + } + + #[tokio::test] + async fn test_delete() { + let (storage, _temp) = create_test_storage().await; + + let content = b"delete test"; + let address = DiskStorage::compute_address(content); + + // Store + storage.put(&address, content).await.expect("put"); + assert!(storage.exists(&address)); + + // Delete + let deleted = storage.delete(&address).await.expect("delete"); + assert!(deleted); + assert!(!storage.exists(&address)); + + // Delete again (already deleted) + let deleted2 = storage.delete(&address).await.expect("delete 2"); + assert!(!deleted2); + } + + #[tokio::test] + async fn test_max_chunks_enforced() { + let temp_dir = TempDir::new().expect("create temp dir"); + let config = DiskStorageConfig { + root_dir: temp_dir.path().to_path_buf(), + verify_on_read: true, + max_chunks: 2, + }; + let storage = DiskStorage::new(config).await.expect("create storage"); + + let content1 = b"chunk one"; + let content2 = b"chunk two"; + let content3 = b"chunk three"; + let addr1 = DiskStorage::compute_address(content1); + let addr2 = DiskStorage::compute_address(content2); + let addr3 = DiskStorage::compute_address(content3); + + // First two should succeed + assert!(storage.put(&addr1, content1).await.is_ok()); + assert!(storage.put(&addr2, content2).await.is_ok()); + + // Third should be rejected + let result = storage.put(&addr3, content3).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("capacity reached")); + } + + #[tokio::test] + async fn test_address_mismatch() { + let (storage, _temp) = create_test_storage().await; + + let content = b"some content"; + let wrong_address = [0xFF; 32]; // Wrong address + + let result = storage.put(&wrong_address, content).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("mismatch")); + } + + #[tokio::test] + async fn test_chunk_path_sharding() { + let (storage, _temp) = create_test_storage().await; + + // Address starting with 0xAB, 0xCD... + let mut address = [0u8; 32]; + address[0] = 0xAB; + address[1] = 0xCD; + + let path = storage.chunk_path(&address); + let path_str = path.to_string_lossy(); + + // Should contain sharded directories + assert!(path_str.contains("ab")); + assert!(path_str.contains("cd")); + assert!(path_str.ends_with(".chunk")); + } + + #[test] + fn test_compute_address() { + // Known SHA256 hash of "hello world" + let content = b"hello world"; + let address = DiskStorage::compute_address(content); + + let expected_hex = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"; + assert_eq!(hex::encode(address), expected_hex); + } + + #[tokio::test] + async fn test_stats() { + let (storage, _temp) = create_test_storage().await; + + let content1 = b"content 1"; + let content2 = b"content 2"; + let address1 = DiskStorage::compute_address(content1); + let address2 = DiskStorage::compute_address(content2); + + // Store two chunks + storage.put(&address1, content1).await.expect("put 1"); + storage.put(&address2, content2).await.expect("put 2"); + + // Retrieve one + storage.get(&address1).await.expect("get"); + + let stats = storage.stats(); + assert_eq!(stats.chunks_stored, 2); + assert_eq!(stats.chunks_retrieved, 1); + assert_eq!( + stats.bytes_stored, + content1.len() as u64 + content2.len() as u64 + ); + assert_eq!(stats.bytes_retrieved, content1.len() as u64); + } +} diff --git a/src/storage/handler.rs b/src/storage/handler.rs new file mode 100644 index 00000000..e13da869 --- /dev/null +++ b/src/storage/handler.rs @@ -0,0 +1,569 @@ +//! ANT protocol handler for autonomi protocol messages. +//! +//! This handler processes chunk PUT/GET requests with optional payment verification, +//! storing chunks to disk and using the DHT for network-wide retrieval. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────┐ +//! │ AntProtocol │ +//! ├─────────────────────────────────────────────────────────┤ +//! │ protocol_id() = "saorsa/ant/chunk/v1" │ +//! │ │ +//! │ handle_message(data) ──▶ decode ChunkMessage │ +//! │ │ │ +//! │ ┌─────────────────────────┼─────────────────┐ │ +//! │ ▼ ▼ ▼ │ +//! │ ChunkQuoteRequest ChunkPutRequest ChunkGetRequest +//! │ │ │ │ │ +//! │ ▼ ▼ ▼ │ +//! │ QuoteGenerator PaymentVerifier DiskStorage│ +//! │ │ │ │ │ +//! │ └─────────────────────────┴─────────────────┘ │ +//! │ │ │ +//! │ return Ok(response_bytes) │ +//! └─────────────────────────────────────────────────────────┘ +//! ``` + +use crate::ant_protocol::{ + ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, + ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, + DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, +}; +use crate::error::Result; +use crate::payment::{PaymentVerifier, QuoteGenerator}; +use crate::storage::disk::DiskStorage; +use bytes::Bytes; +use std::sync::Arc; +use tracing::{debug, info, warn}; + +/// ANT protocol handler. +/// +/// Handles chunk PUT/GET/Quote requests using disk storage for persistence +/// and optional payment verification. +pub struct AntProtocol { + /// Disk storage for chunk persistence. + storage: Arc, + /// Payment verifier for checking payments. + payment_verifier: Arc, + /// Quote generator for creating storage quotes. + quote_generator: Arc, +} + +impl AntProtocol { + /// Create a new ANT protocol handler. + /// + /// # Arguments + /// + /// * `storage` - Disk storage for chunk persistence + /// * `payment_verifier` - Payment verifier for validating payments + /// * `quote_generator` - Quote generator for creating storage quotes + #[must_use] + pub fn new( + storage: Arc, + payment_verifier: Arc, + quote_generator: Arc, + ) -> Self { + Self { + storage, + payment_verifier, + quote_generator, + } + } + + /// Get the protocol identifier. + #[must_use] + pub fn protocol_id(&self) -> &'static str { + CHUNK_PROTOCOL_ID + } + + /// Handle an incoming protocol message. + /// + /// # Arguments + /// + /// * `data` - Raw message bytes + /// + /// # Returns + /// + /// Response bytes, or an error if handling fails. + /// + /// # Errors + /// + /// Returns an error if message decoding or handling fails. + pub async fn handle_message(&self, data: &[u8]) -> Result { + let message = ChunkMessage::decode(data) + .map_err(|e| crate::error::Error::Protocol(format!("Failed to decode message: {e}")))?; + + let request_id = message.request_id; + + let response_body = match message.body { + ChunkMessageBody::PutRequest(req) => { + ChunkMessageBody::PutResponse(self.handle_put(req).await) + } + ChunkMessageBody::GetRequest(req) => { + ChunkMessageBody::GetResponse(self.handle_get(req).await) + } + ChunkMessageBody::QuoteRequest(ref req) => { + ChunkMessageBody::QuoteResponse(self.handle_quote(req)) + } + // Response messages shouldn't be received as requests + ChunkMessageBody::PutResponse(_) + | ChunkMessageBody::GetResponse(_) + | ChunkMessageBody::QuoteResponse(_) => { + let error = ProtocolError::Internal("Unexpected response message".to_string()); + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(error)) + } + }; + + let response = ChunkMessage { + request_id, + body: response_body, + }; + + response + .encode() + .map(Bytes::from) + .map_err(|e| crate::error::Error::Protocol(format!("Failed to encode response: {e}"))) + } + + /// Handle a PUT request. + async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse { + let address = request.address; + debug!("Handling PUT request for {}", hex::encode(address)); + + // 1. Validate chunk size + if request.content.len() > MAX_CHUNK_SIZE { + return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge { + size: request.content.len(), + max_size: MAX_CHUNK_SIZE, + }); + } + + // 2. Verify content address matches SHA256(content) + let computed = crate::client::compute_address(&request.content); + if computed != address { + return ChunkPutResponse::Error(ProtocolError::AddressMismatch { + expected: address, + actual: computed, + }); + } + + // 3. Check if already exists (idempotent success) + if self.storage.exists(&address) { + debug!("Chunk {} already exists", hex::encode(address)); + return ChunkPutResponse::AlreadyExists { address }; + } + + // 4. Verify payment + let payment_result = self + .payment_verifier + .verify_payment(&address, request.payment_proof.as_deref()) + .await; + + match payment_result { + Ok(status) if status.can_store() => { + // Payment verified or cached + } + Ok(_) => { + return ChunkPutResponse::PaymentRequired { + message: "Payment required for new chunk".to_string(), + }; + } + Err(e) => { + return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string())); + } + } + + // 5. Store to disk + match self.storage.put(&address, &request.content).await { + Ok(_) => { + info!( + "Stored chunk {} ({} bytes)", + hex::encode(address), + request.content.len() + ); + // Record the store in metrics + self.quote_generator.record_store(DATA_TYPE_CHUNK); + ChunkPutResponse::Success { address } + } + Err(e) => { + warn!("Failed to store chunk {}: {}", hex::encode(address), e); + ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())) + } + } + } + + /// Handle a GET request. + async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse { + let address = request.address; + debug!("Handling GET request for {}", hex::encode(address)); + + match self.storage.get(&address).await { + Ok(Some(content)) => { + debug!( + "Retrieved chunk {} ({} bytes)", + hex::encode(address), + content.len() + ); + ChunkGetResponse::Success { address, content } + } + Ok(None) => { + debug!("Chunk {} not found", hex::encode(address)); + ChunkGetResponse::NotFound { address } + } + Err(e) => { + warn!("Failed to retrieve chunk {}: {}", hex::encode(address), e); + ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string())) + } + } + } + + /// Handle a quote request. + fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { + debug!( + "Handling quote request for {} (size: {})", + hex::encode(request.address), + request.data_size + ); + + // Validate data size - data_size is u64, cast carefully + let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); + if data_size_usize > MAX_CHUNK_SIZE { + return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { + size: data_size_usize, + max_size: MAX_CHUNK_SIZE, + }); + } + + match self + .quote_generator + .create_quote(request.address, data_size_usize, request.data_type) + { + Ok(quote) => { + // Serialize the quote + match rmp_serde::to_vec("e) { + Ok(quote_bytes) => ChunkQuoteResponse::Success { quote: quote_bytes }, + Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!( + "Failed to serialize quote: {e}" + ))), + } + } + Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())), + } + } + + /// Get storage statistics. + #[must_use] + pub fn storage_stats(&self) -> crate::storage::StorageStats { + self.storage.stats() + } + + /// Get payment cache statistics. + #[must_use] + pub fn payment_cache_stats(&self) -> crate::payment::CacheStats { + self.payment_verifier.cache_stats() + } + + /// Check if a chunk exists locally. + #[must_use] + pub fn exists(&self, address: &[u8; 32]) -> bool { + self.storage.exists(address) + } + + /// Get a chunk directly from local storage. + /// + /// # Errors + /// + /// Returns an error if storage access fails. + pub async fn get_local(&self, address: &[u8; 32]) -> Result>> { + self.storage.get(address).await + } + + /// Store a chunk directly to local storage (bypasses payment verification). + /// + /// This is useful for testing or when payment has been verified elsewhere. + /// + /// # Errors + /// + /// Returns an error if storage fails or content doesn't match address. + pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result { + self.storage.put(address, content).await + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +mod tests { + use super::*; + use crate::payment::metrics::QuotingMetricsTracker; + use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig}; + use crate::storage::DiskStorageConfig; + use ant_evm::RewardsAddress; + use tempfile::TempDir; + + async fn create_test_protocol() -> (AntProtocol, TempDir) { + let temp_dir = TempDir::new().expect("create temp dir"); + + let storage_config = DiskStorageConfig { + root_dir: temp_dir.path().to_path_buf(), + verify_on_read: true, + max_chunks: 0, + }; + let storage = Arc::new( + DiskStorage::new(storage_config) + .await + .expect("create storage"), + ); + + let payment_config = PaymentVerifierConfig { + evm: EvmVerifierConfig { + enabled: false, // Disable EVM for tests + ..Default::default() + }, + cache_capacity: 100, + }; + let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); + + let rewards_address = RewardsAddress::new([1u8; 20]); + let metrics_tracker = QuotingMetricsTracker::new(1000, 100); + let quote_generator = Arc::new(QuoteGenerator::new(rewards_address, metrics_tracker)); + + let protocol = AntProtocol::new(storage, payment_verifier, quote_generator); + (protocol, temp_dir) + } + + #[tokio::test] + async fn test_put_and_get_chunk() { + let (protocol, _temp) = create_test_protocol().await; + + let content = b"hello world"; + let address = DiskStorage::compute_address(content); + + // Create PUT request - with empty payment proof (EVM disabled) + let put_request = ChunkPutRequest::with_payment( + address, + content.to_vec(), + rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .unwrap(), + ); + let put_msg = ChunkMessage { + request_id: 1, + body: ChunkMessageBody::PutRequest(put_request), + }; + let put_bytes = put_msg.encode().expect("encode put"); + + // Handle PUT + let response_bytes = protocol + .handle_message(&put_bytes) + .await + .expect("handle put"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 1); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) = + response.body + { + assert_eq!(addr, address); + } else { + panic!("expected PutResponse::Success, got: {response:?}"); + } + + // Create GET request + let get_request = ChunkGetRequest::new(address); + let get_msg = ChunkMessage { + request_id: 2, + body: ChunkMessageBody::GetRequest(get_request), + }; + let get_bytes = get_msg.encode().expect("encode get"); + + // Handle GET + let response_bytes = protocol + .handle_message(&get_bytes) + .await + .expect("handle get"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 2); + if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success { + address: addr, + content: data, + }) = response.body + { + assert_eq!(addr, address); + assert_eq!(data, content.to_vec()); + } else { + panic!("expected GetResponse::Success"); + } + } + + #[tokio::test] + async fn test_get_not_found() { + let (protocol, _temp) = create_test_protocol().await; + + let address = [0xAB; 32]; + let get_request = ChunkGetRequest::new(address); + let get_msg = ChunkMessage { + request_id: 10, + body: ChunkMessageBody::GetRequest(get_request), + }; + let get_bytes = get_msg.encode().expect("encode get"); + + let response_bytes = protocol + .handle_message(&get_bytes) + .await + .expect("handle get"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 10); + if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) = + response.body + { + assert_eq!(addr, address); + } else { + panic!("expected GetResponse::NotFound"); + } + } + + #[tokio::test] + async fn test_put_address_mismatch() { + let (protocol, _temp) = create_test_protocol().await; + + let content = b"test content"; + let wrong_address = [0xFF; 32]; // Wrong address + + let put_request = ChunkPutRequest::with_payment( + wrong_address, + content.to_vec(), + rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .unwrap(), + ); + let put_msg = ChunkMessage { + request_id: 20, + body: ChunkMessageBody::PutRequest(put_request), + }; + let put_bytes = put_msg.encode().expect("encode put"); + + let response_bytes = protocol + .handle_message(&put_bytes) + .await + .expect("handle put"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 20); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error( + ProtocolError::AddressMismatch { .. }, + )) = response.body + { + // Expected + } else { + panic!("expected AddressMismatch error, got: {response:?}"); + } + } + + #[tokio::test] + async fn test_put_chunk_too_large() { + let (protocol, _temp) = create_test_protocol().await; + + // Create oversized content + let content = vec![0u8; MAX_CHUNK_SIZE + 1]; + let address = DiskStorage::compute_address(&content); + + let put_request = ChunkPutRequest::new(address, content); + let put_msg = ChunkMessage { + request_id: 30, + body: ChunkMessageBody::PutRequest(put_request), + }; + let put_bytes = put_msg.encode().expect("encode put"); + + let response_bytes = protocol + .handle_message(&put_bytes) + .await + .expect("handle put"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 30); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error( + ProtocolError::ChunkTooLarge { .. }, + )) = response.body + { + // Expected + } else { + panic!("expected ChunkTooLarge error"); + } + } + + #[tokio::test] + async fn test_put_already_exists() { + let (protocol, _temp) = create_test_protocol().await; + + let content = b"duplicate content"; + let address = DiskStorage::compute_address(content); + + // Store first time + let put_request = ChunkPutRequest::with_payment( + address, + content.to_vec(), + rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .unwrap(), + ); + let put_msg = ChunkMessage { + request_id: 40, + body: ChunkMessageBody::PutRequest(put_request), + }; + let put_bytes = put_msg.encode().expect("encode put"); + + let _ = protocol + .handle_message(&put_bytes) + .await + .expect("handle put"); + + // Store again - should return AlreadyExists + let response_bytes = protocol + .handle_message(&put_bytes) + .await + .expect("handle put 2"); + let response = ChunkMessage::decode(&response_bytes).expect("decode response"); + + assert_eq!(response.request_id, 40); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) = + response.body + { + assert_eq!(addr, address); + } else { + panic!("expected AlreadyExists"); + } + } + + #[tokio::test] + async fn test_protocol_id() { + let (protocol, _temp) = create_test_protocol().await; + assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID); + } + + #[tokio::test] + async fn test_exists_and_local_access() { + let (protocol, _temp) = create_test_protocol().await; + + let content = b"local access test"; + let address = DiskStorage::compute_address(content); + + assert!(!protocol.exists(&address)); + + protocol + .put_local(&address, content) + .await + .expect("put local"); + + assert!(protocol.exists(&address)); + + let retrieved = protocol.get_local(&address).await.expect("get local"); + assert_eq!(retrieved, Some(content.to_vec())); + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 00000000..56a2f195 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,51 @@ +//! Storage subsystem for chunk persistence. +//! +//! This module provides content-addressed disk storage for chunks, +//! along with a protocol handler that integrates with saorsa-core's +//! `Protocol` trait for automatic message routing. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────┐ +//! │ AntProtocol (implements Protocol trait) │ +//! ├─────────────────────────────────────────────────────────┤ +//! │ protocol_id() = "saorsa/ant/chunk/v1" │ +//! │ │ +//! │ handle(peer_id, data) ──▶ decode AntProtocolMessage │ +//! │ │ │ +//! │ ┌─────────────────────────┼─────────────────┐ │ +//! │ ▼ ▼ ▼ │ +//! │ QuoteRequest ChunkPutRequest ChunkGetRequest +//! │ │ │ │ │ +//! │ ▼ ▼ ▼ │ +//! │ QuoteGenerator PaymentVerifier DiskStorage│ +//! │ │ │ │ │ +//! │ └─────────────────────────┴─────────────────┘ │ +//! │ │ │ +//! │ return Ok(Some(response_bytes)) │ +//! └─────────────────────────────────────────────────────────┘ +//! ``` +//! +//! # Example +//! +//! ```rust,ignore +//! use saorsa_node::storage::{AntProtocol, DiskStorage, DiskStorageConfig}; +//! +//! // Create storage +//! let config = DiskStorageConfig::default(); +//! let storage = DiskStorage::new(config).await?; +//! +//! // Create protocol handler +//! let protocol = AntProtocol::new(storage, payment_verifier, quote_generator); +//! +//! // Register with saorsa-core +//! listener.register_protocol(protocol).await?; +//! ``` + +mod disk; +mod handler; + +pub use crate::ant_protocol::XorName; +pub use disk::{DiskStorage, DiskStorageConfig, StorageStats}; +pub use handler::AntProtocol; diff --git a/tests/e2e/data_types/chunk.rs b/tests/e2e/data_types/chunk.rs index 1a63ef78..43d6153b 100644 --- a/tests/e2e/data_types/chunk.rs +++ b/tests/e2e/data_types/chunk.rs @@ -14,8 +14,6 @@ #![allow(clippy::unwrap_used, clippy::expect_used)] -use sha2::{Digest, Sha256}; - use super::{TestData, MAX_CHUNK_SIZE}; /// Size of small test data (1KB). @@ -55,12 +53,7 @@ impl ChunkTestFixture { /// Compute content address for data (SHA256 hash). #[must_use] pub fn compute_address(data: &[u8]) -> [u8; 32] { - let mut hasher = Sha256::new(); - hasher.update(data); - let hash = hasher.finalize(); - let mut address = [0u8; 32]; - address.copy_from_slice(&hash); - address + saorsa_node::compute_address(data) } } @@ -68,6 +61,7 @@ impl ChunkTestFixture { mod tests { use super::*; use crate::TestHarness; + use rand::seq::SliceRandom; /// Test 1: Content address computation is deterministic #[test] @@ -132,7 +126,7 @@ mod tests { /// 4. Verifies data integrity /// /// Note: Cross-node retrieval is tested separately in `test_chunk_replication`. - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_chunk_store_retrieve_small() { let harness = TestHarness::setup_minimal() .await @@ -181,7 +175,7 @@ mod tests { } /// Test 7: Store and retrieve large chunk (4MB max). - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_chunk_store_retrieve_large() { let harness = TestHarness::setup_minimal() .await @@ -212,11 +206,78 @@ mod tests { .expect("Failed to teardown harness"); } + // ========================================================================= + // Cross-Node Tests (require P2P network) + // ========================================================================= + + /// Test 8: One node asks another to store a chunk via P2P. + /// + /// This test validates the full cross-node protocol flow: + /// 1. Spins up a minimal 5-node local testnet + /// 2. A regular node (node 3) discovers connected peers + /// 3. Picks a random peer and sends a `ChunkPutRequest` to it + /// 4. The target node stores the chunk and responds with success + /// 5. The regular node then sends a `ChunkGetRequest` to retrieve it + /// 6. Verifies the data round-trips correctly + #[tokio::test(flavor = "multi_thread")] + async fn test_chunk_store_on_remote_node() { + let harness = TestHarness::setup_minimal() + .await + .expect("Failed to setup test harness"); + + let fixture = ChunkTestFixture::new(); + + // Node 3 (regular) discovers its connected peers and picks a random one + let requester = harness.test_node(3).expect("Node 3 should exist"); + let peers = requester.connected_peers().await; + assert!( + !peers.is_empty(), + "Node 3 should have at least one connected peer" + ); + + let mut rng = rand::thread_rng(); + let target_peer_id = peers.choose(&mut rng).expect("peers is non-empty"); + + let address = requester + .store_chunk_on_peer(target_peer_id, &fixture.small) + .await + .expect("Failed to store chunk on remote node"); + + // Verify the returned address matches the expected content hash + let expected_address = ChunkTestFixture::compute_address(&fixture.small); + assert_eq!( + address, expected_address, + "Returned address should match computed content address" + ); + + // Retrieve the chunk back from the same remote peer via P2P + let retrieved = requester + .get_chunk_from_peer(target_peer_id, &address) + .await + .expect("Failed to retrieve chunk from remote node"); + + let chunk = retrieved.expect("Chunk should exist on remote storage node"); + assert_eq!( + chunk.content.as_ref(), + fixture.small.as_slice(), + "Retrieved data should match original" + ); + assert_eq!( + chunk.address, address, + "Chunk address should match the stored address" + ); + + harness + .teardown() + .await + .expect("Failed to teardown harness"); + } + // ========================================================================= // Tests requiring additional infrastructure (not yet implemented) // ========================================================================= - /// Test 8: Chunk replication across nodes. + /// Test 9: Chunk replication across nodes. /// /// Store on one node, retrieve from a different node. #[test] @@ -242,7 +303,7 @@ mod tests { /// /// Chunks have a maximum size of 4MB. Attempting to store a larger /// chunk should fail with an appropriate error. - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_chunk_reject_oversized() { let harness = TestHarness::setup_minimal() .await diff --git a/tests/e2e/integration_tests.rs b/tests/e2e/integration_tests.rs index d1cfa99c..8105ba0a 100644 --- a/tests/e2e/integration_tests.rs +++ b/tests/e2e/integration_tests.rs @@ -9,6 +9,10 @@ use super::testnet::{ SMALL_NODE_COUNT, TEST_PORT_RANGE_MAX, TEST_PORT_RANGE_MIN, }; use super::{NetworkState, TestHarness, TestNetwork, TestNetworkConfig}; +use bytes::Bytes; +use saorsa_core::P2PEvent; +use saorsa_node::client::{QuantumClient, QuantumConfig}; +use std::sync::Arc; use std::time::Duration; /// Test that a minimal network (5 nodes) can form and stabilize. @@ -184,3 +188,190 @@ fn test_config_presets() { assert_ne!(default.test_data_dir, minimal.test_data_dir); assert_ne!(minimal.test_data_dir, small.test_data_dir); } + +/// Test that a node can send a message to a connected peer and the peer +/// receives it. +/// +/// This validates the fundamental `send_message` / `subscribe_events` layer +/// that all higher-level protocols (chunk, etc.) are built on. +#[tokio::test(flavor = "multi_thread")] +async fn test_node_to_node_messaging() { + const TEST_TOPIC: &str = "test/ping/v1"; + const PAYLOAD: &[u8] = b"hello from node 3"; + // With the reduced transport recv timeout (2s), messages should arrive + // within a few seconds. 10s gives ample headroom. + const DELIVERY_TIMEOUT_SECS: u64 = 10; + + let harness = TestHarness::setup_minimal() + .await + .expect("Failed to setup test harness"); + + // Subscribe on every node's event stream *before* sending, so we can + // confirm exactly which node receives the message. + let all_nodes = harness.all_nodes(); + let mut all_rx: Vec<_> = all_nodes.iter().map(|n| n.subscribe_events()).collect(); + + // Pick node 3 (regular) and send to one of its connected peers. + let sender = harness.test_node(3).expect("Node 3 should exist"); + let peers = sender.connected_peers().await; + assert!( + !peers.is_empty(), + "Node 3 should have at least one connected peer" + ); + let target_peer_id = peers[0].clone(); + + let sender_p2p = sender.p2p_node.as_ref().expect("Node 3 should be running"); + + sender_p2p + .send_message(&target_peer_id, TEST_TOPIC, PAYLOAD.to_vec()) + .await + .expect("Failed to send message to connected peer"); + + // Poll all event streams until the message arrives or we time out. + // Note: P2PEvent::Message.source is a transport-level peer ID (hex), + // which differs from P2PNode::peer_id() (app-level "peer_…" format), + // so we match on topic + payload instead of source identity. + let mut received = false; + let deadline = tokio::time::Instant::now() + Duration::from_secs(DELIVERY_TIMEOUT_SECS); + + // Race all receivers concurrently instead of polling sequentially. + // Pin the deadline sleep once so it tracks cumulative time across loop + // iterations — otherwise select_all always wins the race against a + // freshly-created sleep and the timeout never fires. + let timeout = tokio::time::sleep_until(deadline); + tokio::pin!(timeout); + + while !received { + let futs: Vec<_> = all_rx + .iter_mut() + .enumerate() + .map(|(i, rx)| Box::pin(async move { (i, rx.recv().await) })) + .collect(); + + tokio::select! { + (result, idx, _remaining) = futures::future::select_all(futs) => { + if let (_, Ok(P2PEvent::Message { ref topic, ref data, .. })) = (idx, &result.1) { + if topic == TEST_TOPIC && data.as_slice() == PAYLOAD { + received = true; + } + } + } + () = &mut timeout => { + break; + } + } + } + + assert!( + received, + "No node received the message on topic '{TEST_TOPIC}'" + ); + + // Drop event subscribers before teardown — holding broadcast receivers + // can prevent background tasks from terminating during shutdown. + drop(all_rx); + drop(all_nodes); + + harness + .teardown() + .await + .expect("Failed to teardown test harness"); +} + +/// Test that `QuantumClient` can store and retrieve chunks through the ANT +/// protocol on a live testnet. +/// +/// This validates the full client→P2P→protocol handler→disk storage round-trip: +/// 1. Spin up a minimal 5-node testnet +/// 2. Create a `QuantumClient` connected to a regular node +/// 3. Store a chunk via `put_chunk()` (sends `ChunkPutRequest` over P2P) +/// 4. Retrieve it via `get_chunk()` (sends `ChunkGetRequest` over P2P) +/// 5. Verify existence via `exists()` +/// 6. Confirm a missing chunk returns `None` / `false` +#[tokio::test(flavor = "multi_thread")] +async fn test_quantum_client_chunk_round_trip() { + /// Timeout tuned for local testnet; the default 30s is generous but + /// avoids flakes in CI where node startup is slow. + const CLIENT_TIMEOUT_SECS: u64 = 30; + + let harness = TestHarness::setup_minimal() + .await + .expect("Failed to setup test harness"); + + // Pick a regular node (node 3) and wire a QuantumClient to it + let node = harness.node(3).expect("Node 3 should exist"); + + let config = QuantumConfig { + timeout_secs: CLIENT_TIMEOUT_SECS, + ..Default::default() + }; + let client = QuantumClient::new(config).with_node(Arc::clone(&node)); + + // ── PUT ────────────────────────────────────────────────────────────── + let content = Bytes::from("quantum client e2e test payload"); + let address = client + .put_chunk(content.clone()) + .await + .expect("QuantumClient::put_chunk should succeed"); + + // Address must equal SHA256(content) + let expected_address = saorsa_node::compute_address(&content); + assert_eq!( + address, expected_address, + "put_chunk should return the content-addressed hash" + ); + + // ── GET ────────────────────────────────────────────────────────────── + let retrieved = client + .get_chunk(&address) + .await + .expect("QuantumClient::get_chunk should succeed"); + + let chunk = retrieved.expect("Chunk should be found after storing it"); + assert_eq!( + chunk.content.as_ref(), + content.as_ref(), + "Retrieved content should match what was stored" + ); + assert_eq!( + chunk.address, address, + "Retrieved address should match the stored address" + ); + + // ── EXISTS ─────────────────────────────────────────────────────────── + let found = client + .exists(&address) + .await + .expect("QuantumClient::exists should succeed"); + assert!(found, "exists() should return true for a stored chunk"); + + // ── GET missing ────────────────────────────────────────────────────── + let missing_address = [0xDE; 32]; + let missing = client + .get_chunk(&missing_address) + .await + .expect("get_chunk for missing address should not error"); + assert!( + missing.is_none(), + "get_chunk should return None for a non-existent chunk" + ); + + // ── EXISTS missing ─────────────────────────────────────────────────── + let missing_exists = client + .exists(&missing_address) + .await + .expect("exists for missing address should not error"); + assert!( + !missing_exists, + "exists() should return false for a non-existent chunk" + ); + + // Drop the client (and its event subscriptions) before teardown + drop(client); + drop(node); + + harness + .teardown() + .await + .expect("Failed to teardown test harness"); +} diff --git a/tests/e2e/live_testnet.rs b/tests/e2e/live_testnet.rs index 38e9995d..78c2fd4f 100644 --- a/tests/e2e/live_testnet.rs +++ b/tests/e2e/live_testnet.rs @@ -13,7 +13,6 @@ )] use saorsa_core::{NodeConfig as CoreNodeConfig, P2PNode}; -use sha2::{Digest, Sha256}; use std::env; use std::fs::File; use std::io::{BufRead, BufReader, Write}; @@ -67,12 +66,7 @@ async fn create_testnet_client() -> P2PNode { /// Compute content address (SHA256 hash). fn compute_address(data: &[u8]) -> XorName { - let mut hasher = Sha256::new(); - hasher.update(data); - let hash = hasher.finalize(); - let mut address = [0u8; 32]; - address.copy_from_slice(&hash); - address + saorsa_node::compute_address(data) } /// Generate random chunk data. diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 2655ea31..f4c96810 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -2,12 +2,31 @@ //! //! This module provides the core infrastructure for creating a local testnet //! of 25 saorsa nodes for E2E testing. - +//! +//! ## Protocol-Based Testing +//! +//! Each test node includes a `AntProtocol` handler that processes chunk +//! PUT/GET requests using the autonomi protocol messages. This allows E2E +//! tests to validate the complete protocol flow including: +//! - Message encoding/decoding (bincode serialization) +//! - Content address verification +//! - Payment verification (when enabled) +//! - Disk storage persistence + +use ant_evm::RewardsAddress; use bytes::Bytes; use rand::Rng; -use saorsa_core::{NodeConfig as CoreNodeConfig, P2PNode}; -use saorsa_node::client::{DataChunk, XorName}; -use sha2::{Digest, Sha256}; +use saorsa_core::{NodeConfig as CoreNodeConfig, P2PEvent, P2PNode}; +use saorsa_node::ant_protocol::{ + ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, + ChunkPutResponse, CHUNK_PROTOCOL_ID, +}; +use saorsa_node::client::{send_and_await_chunk_response, DataChunk, XorName}; +use saorsa_node::payment::{ + EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator, + QuotingMetricsTracker, +}; +use saorsa_node::storage::{AntProtocol, DiskStorage, DiskStorageConfig}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -64,6 +83,22 @@ const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60; /// Default timeout for chunk operations (seconds). const DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS: u64 = 30; +// ============================================================================= +// AntProtocol Test Configuration +// ============================================================================= + +/// Payment cache capacity for test nodes. +const TEST_PAYMENT_CACHE_CAPACITY: usize = 1000; + +/// Test rewards address (20 bytes, all 0x01). +const TEST_REWARDS_ADDRESS: [u8; 20] = [0x01; 20]; + +/// Max records for quoting metrics (test value). +const TEST_MAX_RECORDS: usize = 100_000; + +/// Initial records for quoting metrics (test value). +const TEST_INITIAL_RECORDS: usize = 1000; + // ============================================================================= // Default Node Counts // ============================================================================= @@ -279,6 +314,9 @@ pub struct TestNode { /// Reference to the running P2P node. pub p2p_node: Option>, + /// ANT protocol handler for processing chunk PUT/GET requests. + pub ant_protocol: Option>, + /// Is this a bootstrap node? pub is_bootstrap: bool, @@ -287,6 +325,9 @@ pub struct TestNode { /// Bootstrap addresses this node connects to. pub bootstrap_addrs: Vec, + + /// Protocol handler background task. + protocol_task: Option>, } impl TestNode { @@ -307,91 +348,368 @@ impl TestNode { } } + /// Get the list of connected peer IDs. + pub async fn connected_peers(&self) -> Vec { + if let Some(ref node) = self.p2p_node { + node.connected_peers().await + } else { + vec![] + } + } + // ========================================================================= - // Chunk Operations (immutable, content-addressed) + // Chunk Operations (via autonomi protocol messages) // ========================================================================= - /// Store a chunk on the network. + /// Store a chunk using the autonomi protocol. + /// + /// Creates a `ChunkPutRequest` message, sends it to the local `AntProtocol` + /// handler, and parses the `ChunkPutResponse`. /// /// Returns the content-addressed `XorName` where the chunk is stored. /// /// # Errors /// /// Returns an error if the node is not running, chunk exceeds max size, - /// storage fails, or operation times out. + /// protocol handling fails, or the response indicates an error. pub async fn store_chunk(&self, data: &[u8]) -> Result { - // Validate chunk size (max 4MB) - const MAX_CHUNK_SIZE: usize = 4 * 1024 * 1024; - if data.len() > MAX_CHUNK_SIZE { - return Err(TestnetError::Storage(format!( - "Chunk size {} exceeds maximum {} bytes", - data.len(), - MAX_CHUNK_SIZE - ))); - } - - let node = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?; + let protocol = self + .ant_protocol + .as_ref() + .ok_or(TestnetError::NodeNotRunning)?; - // Compute content address (SHA256 hash) + // Compute content address let address = Self::compute_chunk_address(data); - // Store in DHT with timeout + // Create PUT request with empty payment proof (EVM disabled in tests) + let empty_payment = rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .map_err(|e| { + TestnetError::Serialization(format!("Failed to serialize payment proof: {e}")) + })?; + + let request_id: u64 = rand::thread_rng().gen(); + let request = ChunkPutRequest::with_payment(address, data.to_vec(), empty_payment); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::PutRequest(request), + }; + let message_bytes = message.encode().map_err(|e| { + TestnetError::Serialization(format!("Failed to encode PUT request: {e}")) + })?; + + // Handle the protocol message let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS); - tokio::time::timeout(timeout, node.dht_put(address, data.to_vec())) + let response_bytes = tokio::time::timeout(timeout, protocol.handle_message(&message_bytes)) .await .map_err(|_| { TestnetError::Storage(format!( "Timeout storing chunk after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s" )) })? - .map_err(|e| TestnetError::Storage(format!("Failed to store chunk: {e}")))?; + .map_err(|e| TestnetError::Storage(format!("Protocol error: {e}")))?; - debug!( - "Node {} stored chunk at {}", - self.index, - hex::encode(address) - ); - Ok(address) + // Parse response + let response = ChunkMessage::decode(&response_bytes) + .map_err(|e| TestnetError::Storage(format!("Failed to decode response: {e}")))?; + + match response.body { + ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => { + debug!("Node {} stored chunk at {}", self.index, hex::encode(addr)); + Ok(addr) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) => { + debug!( + "Node {} chunk already exists at {}", + self.index, + hex::encode(addr) + ); + Ok(addr) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => Err( + TestnetError::Storage(format!("Payment required: {message}")), + ), + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => { + Err(TestnetError::Storage(format!("Protocol error: {e}"))) + } + _ => Err(TestnetError::Storage( + "Unexpected response type".to_string(), + )), + } } - /// Retrieve a chunk from the network. + /// Retrieve a chunk using the autonomi protocol. + /// + /// Creates a `ChunkGetRequest` message, sends it to the local `AntProtocol` + /// handler, and parses the `ChunkGetResponse`. /// /// # Errors /// - /// Returns an error if the node is not running, retrieval fails, or operation times out. + /// Returns an error if the node is not running, protocol handling fails, + /// or the response indicates an error. pub async fn get_chunk(&self, address: &XorName) -> Result> { - let node = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?; + let protocol = self + .ant_protocol + .as_ref() + .ok_or(TestnetError::NodeNotRunning)?; + + // Create GET request + let request_id: u64 = rand::thread_rng().gen(); + let request = ChunkGetRequest::new(*address); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::GetRequest(request), + }; + let message_bytes = message.encode().map_err(|e| { + TestnetError::Serialization(format!("Failed to encode GET request: {e}")) + })?; + // Handle the protocol message let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS); - let result = tokio::time::timeout(timeout, node.dht_get(*address)) + let response_bytes = tokio::time::timeout(timeout, protocol.handle_message(&message_bytes)) .await .map_err(|_| { TestnetError::Retrieval(format!( "Timeout retrieving chunk after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s" )) - })?; - - match result { - Ok(Some(data)) => { - let chunk = DataChunk::new(*address, Bytes::from(data)); - Ok(Some(chunk)) + })? + .map_err(|e| TestnetError::Retrieval(format!("Protocol error: {e}")))?; + + // Parse response + let response = ChunkMessage::decode(&response_bytes) + .map_err(|e| TestnetError::Retrieval(format!("Failed to decode response: {e}")))?; + + match response.body { + ChunkMessageBody::GetResponse(ChunkGetResponse::Success { address, content }) => { + debug!( + "Node {} retrieved chunk {} ({} bytes)", + self.index, + hex::encode(address), + content.len() + ); + Ok(Some(DataChunk::new(address, Bytes::from(content)))) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address }) => { + debug!( + "Node {} chunk not found: {}", + self.index, + hex::encode(address) + ); + Ok(None) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => { + Err(TestnetError::Retrieval(format!("Protocol error: {e}"))) } - Ok(None) => Ok(None), - Err(e) => Err(TestnetError::Retrieval(format!( - "Failed to retrieve chunk: {e}" - ))), + _ => Err(TestnetError::Retrieval( + "Unexpected response type".to_string(), + )), } } - /// Compute content address for chunk data. + // ========================================================================= + // Remote Chunk Operations (via P2P network) + // ========================================================================= + + /// Store a chunk on a remote node via P2P. + /// + /// Sends a `ChunkPutRequest` to the target node over the P2P network + /// and waits for the `ChunkPutResponse`. + /// + /// # Errors + /// + /// Returns an error if either node is not running, the message cannot be + /// sent, the response times out, or the remote node reports an error. + pub async fn store_chunk_on(&self, target: &Self, data: &[u8]) -> Result { + let target_p2p = target + .p2p_node + .as_ref() + .ok_or(TestnetError::NodeNotRunning)?; + let target_peer_id = target_p2p + .transport_peer_id() + .ok_or_else(|| TestnetError::Core("No transport peer ID available".to_string()))?; + self.store_chunk_on_peer(&target_peer_id, data).await + } + + /// Store a chunk on a remote peer via P2P using the peer's ID directly. + /// + /// # Errors + /// + /// Returns an error if this node is not running, the message cannot be + /// sent, the response times out, or the remote peer reports an error. + pub async fn store_chunk_on_peer(&self, target_peer_id: &str, data: &[u8]) -> Result { + let p2p = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?; + let target_peer_id = target_peer_id.to_string(); + + // Create PUT request + let address = Self::compute_chunk_address(data); + let empty_payment = rmp_serde::to_vec(&ant_evm::ProofOfPayment { + peer_quotes: vec![], + }) + .map_err(|e| { + TestnetError::Serialization(format!("Failed to serialize payment proof: {e}")) + })?; + + let request_id: u64 = rand::thread_rng().gen(); + let request = ChunkPutRequest::with_payment(address, data.to_vec(), empty_payment); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::PutRequest(request), + }; + let message_bytes = message.encode().map_err(|e| { + TestnetError::Serialization(format!("Failed to encode PUT request: {e}")) + })?; + + let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS); + let node_index = self.index; + + send_and_await_chunk_response( + p2p, + &target_peer_id, + message_bytes, + request_id, + timeout, + |body| match body { + ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => { + debug!( + "Node {} stored chunk on peer {}: {}", + node_index, + target_peer_id, + hex::encode(addr) + ); + Some(Ok(addr)) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { + address: addr, + }) => { + debug!( + "Node {} chunk already exists on peer {}: {}", + node_index, + target_peer_id, + hex::encode(addr) + ); + Some(Ok(addr)) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => { + Some(Err(TestnetError::Storage(format!( + "Payment required: {message}" + )))) + } + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err( + TestnetError::Storage(format!("Remote protocol error: {e}")), + )), + _ => None, + }, + |e| TestnetError::Storage(format!("Failed to send PUT to remote node: {e}")), + || { + TestnetError::Storage(format!( + "Timeout waiting for remote store response after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s" + )) + }, + ) + .await + } + + /// Retrieve a chunk from a remote node via P2P. + /// + /// Sends a `ChunkGetRequest` to the target node over the P2P network + /// and waits for the `ChunkGetResponse`. + /// + /// # Errors + /// + /// Returns an error if either node is not running, the message cannot be + /// sent, the response times out, or the remote node reports an error. + pub async fn get_chunk_from( + &self, + target: &Self, + address: &XorName, + ) -> Result> { + let target_p2p = target + .p2p_node + .as_ref() + .ok_or(TestnetError::NodeNotRunning)?; + let target_peer_id = target_p2p + .transport_peer_id() + .ok_or_else(|| TestnetError::Core("No transport peer ID available".to_string()))?; + self.get_chunk_from_peer(&target_peer_id, address).await + } + + /// Retrieve a chunk from a remote peer via P2P using the peer's ID directly. + /// + /// # Errors + /// + /// Returns an error if this node is not running, the message cannot be + /// sent, the response times out, or the remote peer reports an error. + pub async fn get_chunk_from_peer( + &self, + target_peer_id: &str, + address: &XorName, + ) -> Result> { + let p2p = self.p2p_node.as_ref().ok_or(TestnetError::NodeNotRunning)?; + let target_peer_id = target_peer_id.to_string(); + + // Create GET request + let request_id: u64 = rand::thread_rng().gen(); + let request = ChunkGetRequest::new(*address); + let message = ChunkMessage { + request_id, + body: ChunkMessageBody::GetRequest(request), + }; + let message_bytes = message.encode().map_err(|e| { + TestnetError::Serialization(format!("Failed to encode GET request: {e}")) + })?; + + let timeout = Duration::from_secs(DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS); + let node_index = self.index; + + send_and_await_chunk_response( + p2p, + &target_peer_id, + message_bytes, + request_id, + timeout, + |body| match body { + ChunkMessageBody::GetResponse(ChunkGetResponse::Success { + address: addr, + content, + }) => { + debug!( + "Node {} retrieved chunk from peer {}: {} ({} bytes)", + node_index, + target_peer_id, + hex::encode(addr), + content.len() + ); + Some(Ok(Some(DataChunk::new(addr, Bytes::from(content))))) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) => { + debug!( + "Node {} chunk not found on peer {}: {}", + node_index, + target_peer_id, + hex::encode(addr) + ); + Some(Ok(None)) + } + ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err( + TestnetError::Retrieval(format!("Remote protocol error: {e}")), + )), + _ => None, + }, + |e| TestnetError::Retrieval(format!("Failed to send GET to remote node: {e}")), + || { + TestnetError::Retrieval(format!( + "Timeout waiting for remote get response after {DEFAULT_CHUNK_OPERATION_TIMEOUT_SECS}s" + )) + }, + ) + .await + } + + /// Compute content address for chunk data (SHA256 hash). #[must_use] pub fn compute_chunk_address(data: &[u8]) -> XorName { - let mut hasher = Sha256::new(); - hasher.update(data); - let hash = hasher.finalize(); - let mut address = [0u8; 32]; - address.copy_from_slice(&hash); - address + saorsa_node::compute_address(data) } } @@ -576,6 +894,11 @@ impl TestNetwork { } /// Create a test node (but don't start it yet). + /// + /// Initializes the `AntProtocol` handler with: + /// - Disk storage in the node's data directory + /// - Payment verification disabled (for testing) + /// - Quote generation with a test rewards address async fn create_node( &self, index: usize, @@ -592,6 +915,9 @@ impl TestNetwork { tokio::fs::create_dir_all(&data_dir).await?; + // Initialize AntProtocol for this node + let ant_protocol = Self::create_ant_protocol(&data_dir).await?; + Ok(TestNode { index, node_id, @@ -599,12 +925,53 @@ impl TestNetwork { address, data_dir, p2p_node: None, + ant_protocol: Some(Arc::new(ant_protocol)), is_bootstrap, state: Arc::new(RwLock::new(NodeState::Pending)), bootstrap_addrs, + protocol_task: None, }) } + /// Create an `AntProtocol` handler for a test node. + /// + /// Configures: + /// - Disk storage with verification enabled + /// - Payment verification disabled (for testing without Anvil) + /// - Quote generator with a test rewards address + async fn create_ant_protocol(data_dir: &std::path::Path) -> Result { + // Create disk storage + let storage_config = DiskStorageConfig { + root_dir: data_dir.to_path_buf(), + verify_on_read: true, + max_chunks: 0, // Unlimited for tests + }; + let storage = DiskStorage::new(storage_config) + .await + .map_err(|e| TestnetError::Core(format!("Failed to create disk storage: {e}")))?; + + // Create payment verifier with EVM disabled + let payment_config = PaymentVerifierConfig { + evm: EvmVerifierConfig { + enabled: false, // Disable EVM verification for tests + ..Default::default() + }, + cache_capacity: TEST_PAYMENT_CACHE_CAPACITY, + }; + let payment_verifier = PaymentVerifier::new(payment_config); + + // Create quote generator with test rewards address + let rewards_address = RewardsAddress::new(TEST_REWARDS_ADDRESS); + let metrics_tracker = QuotingMetricsTracker::new(TEST_MAX_RECORDS, TEST_INITIAL_RECORDS); + let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker); + + Ok(AntProtocol::new( + Arc::new(storage), + Arc::new(payment_verifier), + Arc::new(quote_generator), + )) + } + /// Start a single node. async fn start_node(&mut self, mut node: TestNode) -> Result<()> { debug!("Starting node {} on port {}", node.index, node.port); @@ -633,6 +1000,55 @@ impl TestNetwork { node.p2p_node = Some(Arc::new(p2p_node)); *node.state.write().await = NodeState::Running; + // Start protocol handler that routes incoming P2P messages to AntProtocol + if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + let mut events = p2p.subscribe_events(); + let p2p_clone = Arc::clone(p2p); + let protocol_clone = Arc::clone(protocol); + let node_index = node.index; + node.protocol_task = Some(tokio::spawn(async move { + while let Ok(event) = events.recv().await { + if let P2PEvent::Message { + topic, + source, + data, + } = event + { + if topic == CHUNK_PROTOCOL_ID { + debug!( + "Node {} received chunk protocol message from {}", + node_index, source + ); + let protocol = Arc::clone(&protocol_clone); + let p2p = Arc::clone(&p2p_clone); + tokio::spawn(async move { + match protocol.handle_message(&data).await { + Ok(response) => { + if let Err(e) = p2p + .send_message( + &source, + CHUNK_PROTOCOL_ID, + response.to_vec(), + ) + .await + { + warn!( + "Node {} failed to send response to {}: {}", + node_index, source, e + ); + } + } + Err(e) => { + warn!("Node {} protocol handler error: {}", node_index, e); + } + } + }); + } + } + } + })); + } + debug!("Node {} started successfully", node.index); self.nodes.push(node); Ok(()) @@ -749,6 +1165,9 @@ impl TestNetwork { // Stop all nodes in reverse order for node in self.nodes.iter_mut().rev() { debug!("Stopping node {}", node.index); + if let Some(handle) = node.protocol_task.take() { + handle.abort(); + } if let Some(ref p2p) = node.p2p_node { if let Err(e) = p2p.shutdown().await { warn!("Error shutting down node {}: {}", node.index, e);