diff --git a/.vscode/settings.json b/.vscode/settings.json index a4653015..eede8944 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,22 +1,23 @@ { - "rust-analyzer.check.command": "clippy", - "rust-analyzer.check.extraArgs": [ - "--", - "-D", - "warnings", - "-W", - "clippy::pedantic", - "-W", - "clippy::nursery", - "-W", - "clippy::style", - "-W", - "clippy::complexity", - "-W", - "clippy::perf", - "-W", - "clippy::suspicious", - "-W", - "clippy::correctness" - ] + "rust-analyzer.check.command": "clippy", + "rust-analyzer.check.extraArgs": [ + "--", + "-D", + "warnings", + "-W", + "clippy::pedantic", + "-W", + "clippy::nursery", + "-W", + "clippy::style", + "-W", + "clippy::complexity", + "-W", + "clippy::perf", + "-W", + "clippy::suspicious", + "-W", + "clippy::correctness" + ], + "idf.pythonInstallPath": "/usr/local/bin/python3" } diff --git a/Cargo.lock b/Cargo.lock index 6aa183b5..bb250eba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,8 @@ dependencies = [ name = "atoma-p2p" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "blake3", "bytes", "chrono", @@ -741,6 +743,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sqlx", "sui-keys", "sui-sdk", "sysinfo", @@ -1420,6 +1423,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.16" @@ -4303,6 +4315,7 @@ dependencies = [ "libp2p-metrics", "libp2p-noise", "libp2p-quic", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -4591,12 +4604,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "548fe44a80ff275d400f1b26b090d441d83ef73efabbeb6415f4ce37e5aed865" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "libp2p-core", "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", "smallvec", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index e8588e90..5754a65e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ license = "Apache-2.0" [workspace.dependencies] aes-gcm = "0.10.3" +async-trait = "0.1.86" anyhow = "1.0.97" atoma-confidential = { path = "./atoma-confidential" } atoma-daemon = { path = "./atoma-daemon" } diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index e32ecf31..057bec2f 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -196,7 +196,9 @@ async fn main() -> Result<()> { let p2p_node_service_handle = spawn_with_shutdown( async move { let p2p_node = - AtomaP2pNode::start(config.p2p, Arc::new(keystore), p2p_event_sender, false)?; + AtomaP2pNode::start(config.p2p, Arc::new(keystore), p2p_event_sender, false) + .await + .map_err(|e| anyhow::anyhow!("Failed to start P2P node: {}", e))?; let pinned_future = Box::pin(p2p_node.run(p2p_node_service_shutdown_receiver)); pinned_future.await }, diff --git a/atoma-p2p-tester/src/main.rs b/atoma-p2p-tester/src/main.rs index 70f4ca11..f02c5e36 100644 --- a/atoma-p2p-tester/src/main.rs +++ b/atoma-p2p-tester/src/main.rs @@ -62,10 +62,11 @@ async fn main() -> Result<(), Box> { // Create and start the P2P node let keystore: FileBasedKeystore = FileBasedKeystore::new(&PathBuf::from(&config.sui.sui_keystore_path())) - .context("Failed to create keystore")?; + .with_context(|| "Failed to create keystore")?; let node = AtomaP2pNode::start(config.p2p, Arc::new(keystore), atoma_p2p_sender, false) - .context("Failed to start P2P node")?; + .await + .with_context(|| "Failed to start P2P node")?; let atoma_p2p_node_handle = spawn_with_shutdown(node.run(shutdown_receiver.clone()), shutdown_sender.clone()); diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index de0a63e2..bedc7ca6 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true license.workspace = true [dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } blake3 = { workspace = true } bytes = { workspace = true } ciborium = { workspace = true } @@ -20,12 +22,14 @@ libp2p = { workspace = true, features = [ "mdns", "kad", "macros", + "request-response", "quic", "tcp", "yamux", "noise", "metrics", "rsa", + "cbor", ] } fastcrypto = { workspace = true } flume = { workspace = true } @@ -39,6 +43,7 @@ sui-keys = { workspace = true } sui-sdk = { workspace = true } serde_json = { workspace = true } sysinfo = { workspace = true } +sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/atoma-p2p/src/config.rs b/atoma-p2p/src/config.rs index 56db7ecc..3e3cd98e 100644 --- a/atoma-p2p/src/config.rs +++ b/atoma-p2p/src/config.rs @@ -47,6 +47,9 @@ pub struct AtomaP2pNodeConfig { /// of the form (`serving_engine`, `metrics_endpoint`) /// (e.g. `"meta-llama/Llama-3.2-3B-Instruct" => ("vllm", "http://chat-completions:8000/metrics")`) pub metrics_endpoints: HashMap, + + /// Database connection URL for `PostgreSQL` + pub database_url: String, } impl AtomaP2pNodeConfig { diff --git a/atoma-p2p/src/errors.rs b/atoma-p2p/src/errors.rs index 2f986502..66595b32 100644 --- a/atoma-p2p/src/errors.rs +++ b/atoma-p2p/src/errors.rs @@ -62,4 +62,14 @@ pub enum AtomaP2pNodeError { PublishError(String), #[error("DNS resolver error: `{0}`")] DnsError(#[from] std::io::Error), + #[error("Failed to connect to database: `{0}`")] + DatabaseConnectionError(#[from] sqlx::Error), + #[error("External error: `{0}`")] + External(String), +} + +impl From for AtomaP2pNodeError { + fn from(err: anyhow::Error) -> Self { + Self::External(err.to_string()) + } } diff --git a/atoma-p2p/src/handlers.rs b/atoma-p2p/src/handlers.rs new file mode 100644 index 00000000..cb9d0eec --- /dev/null +++ b/atoma-p2p/src/handlers.rs @@ -0,0 +1,514 @@ +use crate::errors::AtomaP2pNodeError; +use crate::metrics::{ + KAD_ROUTING_TABLE_SIZE, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, +}; +use crate::service::{AtomaP2pBehaviour, AtomaP2pBehaviourEvent, StateManagerEvent}; +use crate::stack_request_response::StackLeader; +use crate::types::SerializeWithSignature; +use crate::types::SignedNodeMessage; +use crate::utils::validate_signed_node_message; +use crate::AtomaP2pEvent; +use bytes::Bytes; +use flume::Sender; +use libp2p::metrics::Metrics; +use libp2p::metrics::Recorder; +use libp2p::{gossipsub, swarm::SwarmEvent}; +use libp2p::{kad, mdns, request_response, PeerId, Swarm}; +use opentelemetry::KeyValue; +use tracing::{debug, error, instrument, trace}; + +/// # Panics +/// +/// This function will panic if: +/// - `peer_id` is `None` when unwrapping in the `Dialing` and `OutgoingConnectionError` events +#[allow(clippy::too_many_lines)] +pub async fn handle_p2p_event( + swarm: &mut Swarm, + state_manager_sender: &Sender, + event: SwarmEvent, + metrics: &mut Metrics, + is_client: bool, + stack_leader: &StackLeader, +) { + match event { + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { + message_id, + message, + propagation_source, + })) => { + match handle_gossipsub_message( + swarm, + state_manager_sender, + message.data.into(), + &message_id, + &propagation_source, + is_client, + ) + .await + { + Ok(()) => { + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add( + 1, + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], + ); + } + Err(e) => { + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add( + 1, + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], + ); + error!( + target = "atoma-p2p", + event = "gossipsub_message_error", + error = %e, + "Failed to handle gossipsub message" + ); + } + } + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub( + gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + // Record subscript metrics + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Subscribed { + peer_id, + topic: topic.clone(), + }); + + debug!( + target = "atoma-p2p", + event = "gossipsub_subscribed", + peer_id = %peer_id, + topic = %topic, + "Peer subscribed to topic" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub( + gossipsub::Event::Unsubscribed { peer_id, topic }, + )) => { + // Record unsubscription metrics + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Unsubscribed { + peer_id, + topic: topic.clone(), + }); + + debug!( + target = "atoma-p2p", + event = "gossipsub_unsubscribed", + peer_id = %peer_id, + topic = %topic, + "Peer unsubscribed from topic" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered( + discovered_peers, + ))) => { + let peer_count = discovered_peers.len() as u64; + debug!( + target = "atoma-p2p", + event = "mdns_discovered", + peer_count = %peer_count, + "Mdns discovered peers" + ); + for (peer_id, multiaddr) in discovered_peers { + debug!( + target = "atoma-p2p", + event = "mdns_discovered_peer", + peer_id = %peer_id, + multiaddr = %multiaddr, + "Mdns discovered peer" + ); + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, multiaddr); + } + // Record discovery metrics + TOTAL_MDNS_DISCOVERIES.add( + peer_count, + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired( + expired_peers, + ))) => { + debug!( + target = "atoma-p2p", + event = "mdns_expired", + num_expired_peers = %expired_peers.len(), + "Mdns expired" + ); + for (peer_id, multiaddr) in expired_peers { + debug!( + target = "atoma-p2p", + event = "mdns_expired_peer", + peer_id = %peer_id, + multiaddr = %multiaddr, + "Mdns expired peer" + ); + swarm + .behaviour_mut() + .kademlia + .remove_address(&peer_id, &multiaddr); + swarm + .behaviour_mut() + .gossipsub + .remove_explicit_peer(&peer_id); + } + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + })) => { + debug!( + target = "atoma-p2p", + event = "kademlia_routing_updated", + peer = %peer, + is_new_peer = %is_new_peer, + addresses = ?addresses, + bucket_range = ?bucket_range, + old_peer = ?old_peer, + "Kademlia routing updated" + ); + KAD_ROUTING_TABLE_SIZE.record( + addresses.len() as u64, + &[KeyValue::new("peerId", peer.to_base58())], + ); + metrics.record(&kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + }); + } + SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + established_in, + connection_id, + .. + } => { + debug!( + target = "atoma-p2p", + event = "peer_connection_established", + peer_id = %peer_id, + num_established = %num_established, + established_in = ?established_in, + connection_id = %connection_id, + "Peer connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + num_established, + .. + } => { + debug!( + target = "atoma-p2p", + event = "peer_connection_closed", + peer_id = %peer_id, + connection_id = %connection_id, + num_established = %num_established, + "Peer connection closed" + ); + } + SwarmEvent::NewListenAddr { + listener_id, + address, + .. + } => { + debug!( + target = "atoma-p2p", + event = "new_listen_addr", + listener_id = %listener_id, + address = %address, + "New listen address" + ); + } + SwarmEvent::ExpiredListenAddr { + listener_id, + address, + } => { + debug!( + target = "atoma-p2p", + event = "expired_listen_addr", + listener_id = %listener_id, + address = %address, + "Expired listen address" + ); + } + SwarmEvent::Dialing { + peer_id, + connection_id, + .. + } => { + debug!( + target = "atoma-p2p", + event = "dialing", + peer_id = ?peer_id, + connection_id = %connection_id, + "Dialing peer" + ); + TOTAL_DIALS_ATTEMPTED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); + error!( + target = "atoma-p2p", + event = "outgoing_connection_error", + peer_id = ?peer_id, + error = %error, + "Outgoing connection error" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Identify(identify_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "identify", + identify_event = ?identify_event, + "Identify event" + ); + metrics.record(&identify_event); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "kad", + kad_event = ?kad_event, + "Kad event" + ); + metrics.record(&kad_event); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::StackLeaderRequestResponse( + request_response::Event::Message { message, .. }, + )) => match message { + request_response::Message::Request { + request, channel, .. + } => { + trace!( + target = "atoma-p2p", + event = "stack_leader_request", + "Stack leader request" + ); + let stack_leader_response = stack_leader.can_proceed(&request).await; + if let Err(e) = swarm + .behaviour_mut() + .stack_leader_request_response + .send_response(channel, stack_leader_response) + { + error!( + target = "atoma-p2p", + event = "stack_leader_response_error", + error = ?e, + "Failed to send stack leader response" + ); + } + } + request_response::Message::Response { .. } => { + trace!( + target = "atoma-p2p", + event = "stack_leader_response", + "Stack leader response" + ); + } + }, + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::StackLeaderRequestResponse( + request_response::Event::ResponseSent { .. }, + )) => { + trace!( + target = "atoma-p2p", + event = "stack_leader_response_sent", + "Stack leader response sent" + ); + } + swarm_event => { + debug!( + target = "atoma-p2p", + event = "swarm_event", + swarm_event = ?swarm_event, + "Swarm event" + ); + metrics.record(&swarm_event); + } + } +} + +/// Handles incoming gossipsub messages in the P2P network. +/// +/// This method processes UsageMetrics messages by: +/// 1. Validating the message's signature and timestamp +/// 2. Verifying the node's ownership of its small ID +/// 3. Reporting the validation result to the gossipsub protocol +/// 4. Storing the node's public URL in the state manager (if this peer is a client) +/// +/// # Message Flow +/// +/// 1. Receives a message from the gossipsub network +/// 2. Skips processing if the message is from self +/// 3. Deserializes the message into a GossipMessage +/// 4. For UsageMetrics messages: +/// - Validates the message using `validate_usage_metrics_message` +/// - Reports validation result to gossipsub protocol +/// - If this peer is a client, stores the node's public URL in state manager +/// +/// # Arguments +/// +/// * `message_data` - Raw bytes of the received gossipsub message (CBOR encoded) +/// * `message_id` - Unique identifier for the gossipsub message +/// * `propagation_source` - The peer that forwarded this message +/// +/// # Returns +/// +/// Returns `Ok(())` if message processing succeeds, or an error if: +/// - Message deserialization fails +/// - Message validation fails +/// - Reporting validation result fails +/// - Storing public URL fails (for clients) +/// +/// # Errors +/// +/// This function can return the following errors: +/// * `UsageMetricsSerializeError` - If CBOR deserialization fails +/// * `StateManagerError` - If storing public URL fails (for clients) +/// +/// # Security Considerations +/// +/// - Messages from self are ignored to prevent message loops +/// - Messages are validated before processing +/// - Only clients store public URLs to prevent unnecessary data storage +/// - Uses CBOR for efficient binary serialization +/// +/// # Example +/// +/// ```rust,ignore +/// let message_data = // ... received from gossipsub ...; +/// let message_id = // ... message identifier ...; +/// let propagation_source = // ... peer ID ...; +/// +/// match node.handle_gossipsub_message(&message_data, &message_id, &propagation_source).await { +/// Ok(()) => println!("Message processed successfully"), +/// Err(e) => println!("Failed to process message: {}", e), +/// } +/// ``` +/// +/// # Message Validation +/// +/// Messages are validated by: +/// 1. Checking the URL format and timestamp freshness +/// 2. Verifying the cryptographic signature +/// 3. Confirming the node's ownership of its small ID +/// +/// Invalid messages are rejected and not propagated further in the network. +#[instrument(level = "debug", skip_all)] +pub async fn handle_gossipsub_message( + swarm: &mut Swarm, + state_manager_sender: &Sender, + message_data: Bytes, + message_id: &gossipsub::MessageId, + propagation_source: &PeerId, + is_client: bool, +) -> Result<(), AtomaP2pNodeError> { + debug!( + target = "atoma-p2p", + event = "gossipsub_message", + message_id = %message_id, + propagation_source = %propagation_source, + "Received gossipsub message" + ); + if propagation_source == swarm.local_peer_id() { + debug!( + target = "atoma-p2p", + event = "gossipsub_message_from_self", + "Gossipsub message from self" + ); + // Do not re-publish the node's own message, just return `Ok(()) + return Ok(()); + } + // Directly deserialize SignedNodeMessage using new method + let signed_node_message = SignedNodeMessage::deserialize_with_signature(&message_data)?; + let signature_len = signed_node_message.signature.len(); + debug!( + target = "atoma-p2p", + event = "gossipsub_message_data", + message_id = %message_id, + propagation_source = %propagation_source, + "Received gossipsub message data" + ); + let node_message = &signed_node_message.node_message; + let node_message_hash = blake3::hash(&message_data[signature_len..]); + let message_acceptance = match validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + state_manager_sender, + ) + .await + { + Ok(()) => gossipsub::MessageAcceptance::Accept, + Err(e) => { + error!( + target = "atoma-p2p", + event = "gossipsub_message_validation_error", + error = %e, + "Failed to validate gossipsub message" + ); + // NOTE: We should reject the message if it fails to validate + // as it means the node is not being following the current protocol + if let AtomaP2pNodeError::UrlParseError(_) = e { + // We remove the peer from the gossipsub topic, because it is not a valid URL and therefore cannot be reached + // by clients for processing OpenAI api compatible AI requests, so these peers are not useful for the network + swarm + .behaviour_mut() + .gossipsub + .remove_explicit_peer(propagation_source); + } + gossipsub::MessageAcceptance::Reject + } + }; + // Report the message validation result to the gossipsub protocol + let is_in_mempool = swarm + .behaviour_mut() + .gossipsub + .report_message_validation_result(message_id, propagation_source, message_acceptance); + if is_in_mempool { + debug!( + target = "atoma-p2p", + event = "gossipsub_message_in_mempool", + message_id = %message_id, + propagation_source = %propagation_source, + "Gossipsub message already in the mempool, no need to take further actions" + ); + return Ok(()); + } + // If the current peer is a client, we need to store the public URL in the state manager + if is_client { + let node_message = signed_node_message.node_message; + let event = AtomaP2pEvent::NodeMetricsRegistrationEvent { + public_url: node_message.node_metadata.node_public_url, + node_small_id: node_message.node_metadata.node_small_id, + timestamp: node_message.node_metadata.timestamp, + country: node_message.node_metadata.country, + node_metrics: node_message.node_metrics, + }; + state_manager_sender.send((event, None)).map_err(|e| { + error!( + target = "atoma-p2p", + event = "gossipsub_message_state_manager_error", + error = %e, + "Failed to send event to state manager" + ); + AtomaP2pNodeError::StateManagerError(e) + })?; + } + + Ok(()) +} diff --git a/atoma-p2p/src/lib.rs b/atoma-p2p/src/lib.rs index e0b720ec..f3656033 100644 --- a/atoma-p2p/src/lib.rs +++ b/atoma-p2p/src/lib.rs @@ -2,8 +2,10 @@ pub mod broadcast_metrics; pub mod config; pub mod constants; pub mod errors; +pub mod handlers; pub mod metrics; pub mod service; +pub mod stack_request_response; pub mod timer; pub mod types; pub mod utils; diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index c1766da8..d14d1602 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,18 +1,15 @@ use crate::{ config::AtomaP2pNodeConfig, errors::AtomaP2pNodeError, + handlers::handle_p2p_event, metrics::{ - NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, - }, - metrics::{ - KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, - TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_PUBLISHES, - TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, + NetworkMetrics, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, + TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, }, + stack_request_response::{NodeComputeRequest, StackLeader, StackLeaderResponse}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, - utils::{extract_gossipsub_metrics, validate_signed_node_message}, + utils::extract_gossipsub_metrics, }; use bytes::{BufMut, Bytes, BytesMut}; use flume::Sender; @@ -21,15 +18,17 @@ use libp2p::{ autonat, gossipsub::{self}, identify, identity, kad, mdns, noise, - swarm::{NetworkBehaviour, SwarmEvent}, + request_response::{self, ProtocolSupport}, + swarm::NetworkBehaviour, tcp, yamux, PeerId, StreamProtocol, Swarm, SwarmBuilder, }; use libp2p::{ - metrics::{Metrics, Recorder, Registry}, + metrics::{Metrics, Registry}, Multiaddr, }; use opentelemetry::KeyValue; use rand::rngs::OsRng; +use sqlx::PgPool; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use std::time::Duration; @@ -38,7 +37,7 @@ use tokio::{ sync::{mpsc::UnboundedReceiver, oneshot, watch}, task::JoinHandle, }; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, warn}; /// The topic that the P2P network will use to gossip messages const METRICS_GOSPUBSUB_TOPIC: &str = "atoma-p2p-usage-metrics"; @@ -49,6 +48,9 @@ const METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(15); /// The protocol name for the Kademlia DHT const IPFS_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); +/// The protocol name for the Stack Leader +const STACK_LEADER_PROTO_NAME: StreamProtocol = StreamProtocol::new("/atoma/stack-leader/0.0.1"); + // Well connected nodes to bootstrap the network (see https://docs.ipfs.tech/concepts/public-utilities/#amino-dht-bootstrappers) const BOOTSTRAP_NODES: [&str; 4] = [ "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @@ -64,14 +66,14 @@ pub type StateManagerEvent = (AtomaP2pEvent, Option>); /// This struct implements the `NetworkBehaviour` trait and coordinates three main networking components /// for peer discovery, message broadcasting, and distributed routing. #[derive(NetworkBehaviour)] -struct AtomaP2pBehaviour { +pub struct AtomaP2pBehaviour { /// Handles autonat protocol autonat: autonat::v2::client::Behaviour, /// Handles publish-subscribe messaging across the P2P network. /// Used for broadcasting node addresses and other network messages using a gossip protocol /// that ensures efficient message propagation. - gossipsub: gossipsub::Behaviour, + pub gossipsub: gossipsub::Behaviour, /// Provides a way to identify the node and its capabilities. /// Used to discover nodes in the network and to share information about the node, @@ -81,12 +83,17 @@ struct AtomaP2pBehaviour { /// Provides distributed hash table (DHT) functionality for peer discovery and routing. /// Helps maintain network connectivity in larger, distributed deployments by implementing /// the Kademlia protocol with a memory-based storage backend. - kademlia: kad::Behaviour, + pub kademlia: kad::Behaviour, /// Enables automatic peer discovery on local networks using multicast DNS. /// Particularly useful for development and local testing environments where nodes /// need to find each other without explicit configuration. mdns: mdns::tokio::Behaviour, + + /// Provides a way to request-response messages across the P2P network. + /// Used for requesting compute units from the stack leader. + pub stack_leader_request_response: + request_response::cbor::Behaviour, } /// A P2P node implementation for the Atoma network that handles peer discovery, @@ -125,6 +132,9 @@ pub struct AtomaP2pNode { /// Add registry field metrics_registry: Registry, + + /// Add stack leader + stack_leader: StackLeader, } impl AtomaP2pNode { @@ -202,7 +212,7 @@ impl AtomaP2pNode { /// - Peer connections are authenticated /// - The node validates all incoming messages #[instrument(level = "debug", skip_all)] - pub fn start( + pub async fn start( config: AtomaP2pNodeConfig, keystore: Arc, state_manager_sender: Sender, @@ -283,6 +293,11 @@ impl AtomaP2pNode { key.public(), )); + let stack_leader_request_response = request_response::cbor::Behaviour::new( + [(STACK_LEADER_PROTO_NAME, ProtocolSupport::Full)], + request_response::Config::default(), + ); + let autonat = autonat::v2::client::Behaviour::new( OsRng, autonat::v2::client::Config::default() @@ -295,6 +310,7 @@ impl AtomaP2pNode { identify, kademlia, mdns, + stack_leader_request_response, }) }) .map_err(|e| { @@ -394,10 +410,10 @@ impl AtomaP2pNode { for peer_id in BOOTSTRAP_NODES { match peer_id.parse::() { Ok(peer_id) => { - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, "/dnsaddr/bootstrap.libp2p.io".parse()?); + swarm.behaviour_mut().kademlia.add_address( + &peer_id, + "/213.130.147.75/config.port_listening_port".parse()?, + ); debug!( target = "atoma-p2p", event = "dialed_bootstrap_node", @@ -417,6 +433,20 @@ impl AtomaP2pNode { } } + // Initialize the StackLeader with database connection + let stack_leader = { + let db_pool = PgPool::connect(&config.database_url).await.map_err(|e| { + error!( + target = "atoma-p2p", + event = "database_connection_error", + error = %e, + "Failed to connect to database" + ); + AtomaP2pNodeError::DatabaseConnectionError(e) + })?; + StackLeader::new(db_pool) + }; + Ok(Self { keystore, swarm, @@ -426,6 +456,7 @@ impl AtomaP2pNode { is_client, network_metrics, metrics_registry, + stack_leader, }) } @@ -515,7 +546,7 @@ impl AtomaP2pNode { ) -> Result<(), AtomaP2pNodeError> { // Create a metrics update interval let mut metrics_interval = tokio::time::interval(METRICS_UPDATE_INTERVAL); - let metrics = Metrics::new(&mut self.metrics_registry); + let mut metrics = Metrics::new(&mut self.metrics_registry); let peer_id = self.swarm.local_peer_id().to_base58(); loop { @@ -541,265 +572,7 @@ impl AtomaP2pNode { } event = self.swarm.select_next_some() => { - match event { - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { - message_id, - message, - propagation_source, - })) => { - match self.handle_gossipsub_message(message.data.into(), &message_id, &propagation_source).await { - Ok(()) => { - info!( - target = "atoma-p2p", - event = "gossipsub_message_forwarded", - "Gossipsub message forwarded" - ); - TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); - } - Err(e) => { - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); - error!( - target = "atoma-p2p", - event = "gossipsub_message_error", - error = %e, - "Failed to handle gossipsub message" - ); - } - } - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { - peer_id, - topic, - })) => { - // Record subscript metrics - TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); - metrics.record(&gossipsub::Event::Subscribed { - peer_id, - topic: topic.clone(), - }); - - trace!( - target = "atoma-p2p", - event = "gossipsub_subscribed", - peer_id = %peer_id, - topic = %topic, - "Peer subscribed to topic" - ); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed { - peer_id, - topic, - })) => { - // Record unsubscription metrics - TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); - metrics.record(&gossipsub::Event::Unsubscribed { - peer_id, - topic: topic.clone(), - }); - - debug!( - target = "atoma-p2p", - event = "gossipsub_unsubscribed", - peer_id = %peer_id, - topic = %topic, - "Peer unsubscribed from topic" - ); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered(discovered_peers))) => { - let peer_count = discovered_peers.len() as u64; - debug!( - target = "atoma-p2p", - event = "mdns_discovered", - peer_count = %peer_count, - "Mdns discovered peers" - ); - for (peer_id, multiaddr) in discovered_peers { - debug!( - target = "atoma-p2p", - event = "mdns_discovered_peer", - peer_id = %peer_id, - multiaddr = %multiaddr, - "Mdns discovered peer" - ); - self.swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); - } - // Record discovery metrics - TOTAL_MDNS_DISCOVERIES.add(peer_count, &[KeyValue::new("peerId", peer_id.clone())]); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired(expired_peers))) => { - debug!( - target = "atoma-p2p", - event = "mdns_expired", - num_expired_peers = %expired_peers.len(), - "Mdns expired" - ); - for (peer_id, multiaddr) in expired_peers { - debug!( - target = "atoma-p2p", - event = "mdns_expired_peer", - peer_id = %peer_id, - multiaddr = %multiaddr, - "Mdns expired peer" - ); - self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr); - self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); - } - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad::Event::RoutingUpdated { - peer, - is_new_peer, - addresses, - bucket_range, - old_peer, - })) => { - trace!( - target = "atoma-p2p", - event = "kademlia_routing_updated", - peer = %peer, - is_new_peer = %is_new_peer, - addresses = ?addresses, - bucket_range = ?bucket_range, - old_peer = ?old_peer, - "Kademlia routing updated" - ); - KAD_ROUTING_TABLE_SIZE.record(addresses.len() as u64, &[KeyValue::new("peerId", peer.to_base58())]); - metrics.record(&kad::Event::RoutingUpdated { - peer, - is_new_peer, - addresses, - bucket_range, - old_peer, - }); - } - SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - established_in, - connection_id, - .. - } => { - trace!( - target = "atoma-p2p", - event = "peer_connection_established", - peer_id = %peer_id, - num_established = %num_established, - established_in = ?established_in, - connection_id = %connection_id, - "Peer connection established" - ); - } - SwarmEvent::ConnectionClosed { - peer_id, - connection_id, - num_established, - .. - } => { - trace!( - target = "atoma-p2p", - event = "peer_connection_closed", - peer_id = %peer_id, - connection_id = %connection_id, - num_established = %num_established, - "Peer connection closed" - ); - } - SwarmEvent::NewListenAddr { - listener_id, - address, - .. - } => { - info!( - target = "atoma-p2p", - event = "new_listen_addr", - listener_id = %listener_id, - address = %address, - "New listen address" - ); - } - SwarmEvent::ExpiredListenAddr { - listener_id, - address, - } => { - debug!( - target = "atoma-p2p", - event = "expired_listen_addr", - listener_id = %listener_id, - address = %address, - "Expired listen address" - ); - } - SwarmEvent::Dialing { - peer_id, - connection_id, - .. - } => { - debug!( - target = "atoma-p2p", - event = "dialing", - peer_id = ?peer_id, - connection_id = %connection_id, - "Dialing peer" - ); - TOTAL_DIALS_ATTEMPTED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); - } - SwarmEvent::IncomingConnection { - .. - } => { - info!( - target = "atoma-p2p", - event = "incoming_connection", - "Incoming connection" - ); - } - SwarmEvent::IncomingConnectionError { - .. - } => { - error!( - target = "atoma-p2p", - event = "incoming_connection_error", - "Incoming connection error" - ); - } - SwarmEvent::OutgoingConnectionError { - peer_id, - .. - } => { - debug!( - target = "atoma-p2p", - event = "outgoing_connection_error", - peer_id = ?peer_id, - "Outgoing connection error" - ); - TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Identify(identify_event)) => { - tracing::debug!( - target = "atoma-p2p", - event = "identify", - identify_event = ?identify_event, - "Identify event" - ); - metrics.record(&identify_event); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad_event)) => { - tracing::debug!( - target = "atoma-p2p", - event = "kad", - kad_event = ?kad_event, - "Kad event" - ); - metrics.record(&kad_event); - } - swarm_event => { - tracing::debug!( - target = "atoma-p2p", - event = "swarm_event", - swarm_event = ?swarm_event, - "Swarm event" - ); - metrics.record(&swarm_event); - } - } + handle_p2p_event(&mut self.swarm, &self.state_manager_sender, event, &mut metrics, self.is_client, &self.stack_leader).await; } Some(usage_metrics) = self.usage_metrics_rx.recv() => { if let Err(e) = self.handle_new_usage_metrics_event(usage_metrics) { @@ -842,176 +615,6 @@ impl AtomaP2pNode { Ok(()) } - /// Handles incoming gossipsub messages in the P2P network. - /// - /// This method processes UsageMetrics messages by: - /// 1. Validating the message's signature and timestamp - /// 2. Verifying the node's ownership of its small ID - /// 3. Reporting the validation result to the gossipsub protocol - /// 4. Storing the node's public URL in the state manager (if this peer is a client) - /// - /// # Message Flow - /// - /// 1. Receives a message from the gossipsub network - /// 2. Skips processing if the message is from self - /// 3. Deserializes the message into a GossipMessage - /// 4. For UsageMetrics messages: - /// - Validates the message using `validate_usage_metrics_message` - /// - Reports validation result to gossipsub protocol - /// - If this peer is a client, stores the node's public URL in state manager - /// - /// # Arguments - /// - /// * `message_data` - Raw bytes of the received gossipsub message (CBOR encoded) - /// * `message_id` - Unique identifier for the gossipsub message - /// * `propagation_source` - The peer that forwarded this message - /// - /// # Returns - /// - /// Returns `Ok(())` if message processing succeeds, or an error if: - /// - Message deserialization fails - /// - Message validation fails - /// - Reporting validation result fails - /// - Storing public URL fails (for clients) - /// - /// # Errors - /// - /// This function can return the following errors: - /// * `UsageMetricsSerializeError` - If CBOR deserialization fails - /// * `StateManagerError` - If storing public URL fails (for clients) - /// - /// # Security Considerations - /// - /// - Messages from self are ignored to prevent message loops - /// - Messages are validated before processing - /// - Only clients store public URLs to prevent unnecessary data storage - /// - Uses CBOR for efficient binary serialization - /// - /// # Example - /// - /// ```rust,ignore - /// let message_data = // ... received from gossipsub ...; - /// let message_id = // ... message identifier ...; - /// let propagation_source = // ... peer ID ...; - /// - /// match node.handle_gossipsub_message(&message_data, &message_id, &propagation_source).await { - /// Ok(()) => println!("Message processed successfully"), - /// Err(e) => println!("Failed to process message: {}", e), - /// } - /// ``` - /// - /// # Message Validation - /// - /// Messages are validated by: - /// 1. Checking the URL format and timestamp freshness - /// 2. Verifying the cryptographic signature - /// 3. Confirming the node's ownership of its small ID - /// - /// Invalid messages are rejected and not propagated further in the network. - #[instrument(level = "debug", skip_all)] - pub async fn handle_gossipsub_message( - &mut self, - message_data: Bytes, - message_id: &gossipsub::MessageId, - propagation_source: &PeerId, - ) -> Result<(), AtomaP2pNodeError> { - trace!( - target = "atoma-p2p", - event = "gossipsub_message", - message_id = %message_id, - propagation_source = %propagation_source, - "Received gossipsub message" - ); - if propagation_source == self.swarm.local_peer_id() { - trace!( - target = "atoma-p2p", - event = "gossipsub_message_from_self", - "Gossipsub message from self" - ); - // Do not re-publish the node's own message, just return `Ok(()) - return Ok(()); - } - // Directly deserialize SignedNodeMessage using new method - let signed_node_message = SignedNodeMessage::deserialize_with_signature(&message_data)?; - let signature_len = signed_node_message.signature.len(); - trace!( - target = "atoma-p2p", - event = "gossipsub_message_data", - message_id = %message_id, - propagation_source = %propagation_source, - "Received gossipsub message data" - ); - let node_message = &signed_node_message.node_message; - let node_message_hash = blake3::hash(&message_data[signature_len..]); - let message_acceptance = match validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &self.state_manager_sender, - ) - .await - { - Ok(()) => gossipsub::MessageAcceptance::Accept, - Err(e) => { - error!( - target = "atoma-p2p", - event = "gossipsub_message_validation_error", - error = %e, - "Failed to validate gossipsub message" - ); - // NOTE: We should reject the message if it fails to validate - // as it means the node is not being following the current protocol - if let AtomaP2pNodeError::UrlParseError(_) = e { - // We remove the peer from the gossipsub topic, because it is not a valid URL and therefore cannot be reached - // by clients for processing OpenAI api compatible AI requests, so these peers are not useful for the network - self.swarm - .behaviour_mut() - .gossipsub - .remove_explicit_peer(propagation_source); - } - gossipsub::MessageAcceptance::Reject - } - }; - // Report the message validation result to the gossipsub protocol - let is_in_mempool = self - .swarm - .behaviour_mut() - .gossipsub - .report_message_validation_result(message_id, propagation_source, message_acceptance); - if is_in_mempool { - trace!( - target = "atoma-p2p", - event = "gossipsub_message_in_mempool", - message_id = %message_id, - propagation_source = %propagation_source, - "Gossipsub message already in the mempool, no need to take further actions" - ); - return Ok(()); - } - // If the current peer is a client, we need to store the public URL in the state manager - if self.is_client { - let node_message = signed_node_message.node_message; - let event = AtomaP2pEvent::NodeMetricsRegistrationEvent { - public_url: node_message.node_metadata.node_public_url, - node_small_id: node_message.node_metadata.node_small_id, - timestamp: node_message.node_metadata.timestamp, - country: node_message.node_metadata.country, - node_metrics: node_message.node_metrics, - }; - self.state_manager_sender.send((event, None)).map_err(|e| { - error!( - target = "atoma-p2p", - event = "gossipsub_message_state_manager_error", - error = %e, - "Failed to send event to state manager" - ); - AtomaP2pNodeError::StateManagerError(e) - })?; - } - - Ok(()) - } - /// Handles the publishing of new usage metrics to the P2P network. /// /// This method performs the following operations: diff --git a/atoma-p2p/src/stack_request_response.rs b/atoma-p2p/src/stack_request_response.rs new file mode 100644 index 00000000..ad9bf729 --- /dev/null +++ b/atoma-p2p/src/stack_request_response.rs @@ -0,0 +1,81 @@ +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use sqlx::Row; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeComputeRequest { + pub node_id: u64, + pub stack_small_id: i64, + pub requested_num_compute_units: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StackLeaderResponse { + pub can_proceed: bool, +} + +pub struct StackLeader { + db: PgPool, +} + +impl StackLeader { + /// Creates a new `StackLeaderResponse` based on the compute unit request and available capacity + /// + /// # Arguments + /// * `request` - The `NodeComputeRequest` containing requested compute units + /// * `available_compute_units` - Total remaining compute units available in the stack + /// + /// # Returns + /// Returns a `StackLeaderResponse` indicating whether the request can proceed based on available capacity + #[must_use] + pub const fn new(db: PgPool) -> Self { + Self { db } + } + + /// Retrieves the available compute units for a specific stack from the database + /// + /// # Arguments + /// * `db` - Database connection or reference + /// * `stack_small_id` - The unique identifier for the stack + /// + /// # Returns + /// Result containing the available compute units or a database error + /// + /// # Errors + /// Returns a `sqlx::Error` if the database query fails or if the stack is not found + #[allow(clippy::cast_sign_loss)] + pub async fn get_stack_available_compute_units( + &self, + node_compute_request: &NodeComputeRequest, + ) -> Result { + // Query the database for the available compute units for this stack + let mut tx = self.db.begin().await?; + + let row = + sqlx::query("SELECT available_compute_units FROM stacks WHERE stack_small_id = $1") + .bind(node_compute_request.stack_small_id) + .fetch_one(&mut *tx) + .await?; + + let compute_units_i64: i64 = row.get("available_compute_units"); + let available_compute_units = compute_units_i64 as u64; + tx.commit().await?; + + Ok(available_compute_units) + } + + pub async fn can_proceed( + &self, + node_compute_request: &NodeComputeRequest, + ) -> StackLeaderResponse { + self.get_stack_available_compute_units(node_compute_request) + .await + .map_or_else( + |_| StackLeaderResponse { can_proceed: false }, + |available_compute_units| StackLeaderResponse { + can_proceed: available_compute_units + >= node_compute_request.requested_num_compute_units, + }, + ) + } +} diff --git a/deny.toml b/deny.toml index db381e15..70e594a5 100644 --- a/deny.toml +++ b/deny.toml @@ -26,6 +26,14 @@ ignore = [ "RUSTSEC-2024-0370", # ring is unmaintained but is a critical dependency through sui-sdk and libp2p "RUSTSEC-2025-0007", + # protobuf stack overflow vulnerability + "RUSTSEC-2024-0437", + # ring AES functions may panic when overflow checking is enabled + "RUSTSEC-2025-0009", + # Ring versions prior to 0.17 are unmaintained + "RUSTSEC-2025-0010", + # paste is no longer maintained + "RUSTSEC-2024-0436", ] # Threshold for security vulnerabilities, any vulnerability with a CVSS score # lower than the range specified will be ignored. Note that ignored advisories