From be5e311d3709b5155c8fcac19134274ca2e093fc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:06:40 +0300 Subject: [PATCH 01/22] manager: Add connection middleware trait Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 0af49eb1..4f4818a6 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -20,10 +20,39 @@ //! Limits for the transport manager. -use crate::types::ConnectionId; +use crate::{transport::Endpoint, types::ConnectionId, PeerId}; use std::collections::HashSet; +/// A middleware trait for implementing connection limits. +pub trait ConnectionMiddleware { + /// Determines the number of outbound connections permitted to be established. + /// + /// Returns the number of allowed outbound connections. If there is no limit, + /// returns `Ok(usize::MAX)`. If the node cannot accept any more outbound + /// connections, returns an error. + fn outbound_capacity(&mut self) -> crate::Result; + + /// Checks whether a new inbound connection can be accepted before processing it. + fn check_inbound(&mut self) -> crate::Result<()>; + + /// Verifies if a new connection (inbound or outbound) can be established. + /// + /// Returns an error if connection limits or policy constraints prevent + /// establishing the connection. + fn check_connection(&mut self, peer: PeerId, endpoint: &Endpoint) -> crate::Result<()>; + + /// Registers a connection as established. + /// + /// This method will be called after a successful check using [`Self::check_connection`]. + fn register_connection(&mut self, peer: PeerId, endpoint: &Endpoint); + + /// Deregisters a connection when it is closed. + /// + /// This method will be called after a [`Self::register_connection`] call. + fn deregister_connection(&mut self, peer: PeerId, endpoint: &Endpoint); +} + /// Configuration for the connection limits. #[derive(Debug, Clone, Default)] pub struct ConnectionLimitsConfig { From 2756803ca5cb6350037837638ab356b24ab71750 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:10:09 +0300 Subject: [PATCH 02/22] config: Expose connection middleware to builder Signed-off-by: Alexandru Vasile --- src/config.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index c2956021..ad968b8f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,7 +29,8 @@ use crate::{ notification, request_response, UserProtocol, }, transport::{ - manager::limits::ConnectionLimitsConfig, tcp::config::Config as TcpConfig, + manager::limits::{ConnectionLimitsConfig, ConnectionMiddleware}, + tcp::config::Config as TcpConfig, KEEP_ALIVE_TIMEOUT, MAX_PARALLEL_DIALS, }, types::protocol::ProtocolName, @@ -124,6 +125,9 @@ pub struct ConfigBuilder { /// Close the connection if no substreams are open within this time frame. keep_alive_timeout: Duration, + + /// Connection middleware. + connection_middleware: Option>, } impl Default for ConfigBuilder { @@ -157,6 +161,7 @@ impl ConfigBuilder { known_addresses: Vec::new(), connection_limits: ConnectionLimitsConfig::default(), keep_alive_timeout: KEEP_ALIVE_TIMEOUT, + connection_middleware: None, } } @@ -272,6 +277,11 @@ impl ConfigBuilder { self } + pub fn with_connection_middleware(mut self, middleware: Box) -> Self { + self.connection_middleware = Some(middleware); + self + } + /// Set keep alive timeout for connections. pub fn with_keep_alive_timeout(mut self, timeout: Duration) -> Self { self.keep_alive_timeout = timeout; @@ -307,6 +317,7 @@ impl ConfigBuilder { known_addresses: self.known_addresses, connection_limits: self.connection_limits, keep_alive_timeout: self.keep_alive_timeout, + connection_middleware: self.connection_middleware.take(), } } } @@ -369,4 +380,7 @@ pub struct Litep2pConfig { /// Close the connection if no substreams are open within this time frame. pub(crate) keep_alive_timeout: Duration, + + /// Connection middleware. + pub(crate) connection_middleware: Option>, } From 2f8ed647b88c7e7beacee9071f33814b9a8d2576 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:13:25 +0300 Subject: [PATCH 03/22] manager: Inject middleware from the litep2p config Signed-off-by: Alexandru Vasile --- src/lib.rs | 1 + src/transport/manager/mod.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 66e03289..9314b312 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -164,6 +164,7 @@ impl Litep2p { bandwidth_sink.clone(), litep2p_config.max_parallel_dials, litep2p_config.connection_limits, + litep2p_config.connection_middleware, ); // add known addresses to `TransportManager`, if any exist diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index dec079a7..00f7a1ba 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -41,6 +41,7 @@ use crate::{ use address::{scores, AddressStore}; use futures::{Stream, StreamExt}; use indexmap::IndexMap; +use limits::ConnectionMiddleware; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; use parking_lot::RwLock; @@ -252,6 +253,9 @@ pub struct TransportManager { /// Opening connections errors. opening_errors: HashMap>, + + /// Connection middleware. + connection_middleware: Option>, } impl TransportManager { @@ -263,6 +267,7 @@ impl TransportManager { bandwidth_sink: BandwidthSink, max_parallel_dials: usize, connection_limits_config: limits::ConnectionLimitsConfig, + connection_middleware: Option>, ) -> (Self, TransportManagerHandle) { let local_peer_id = PeerId::from_public_key(&keypair.public().into()); let peers = Arc::new(RwLock::new(HashMap::new())); @@ -300,6 +305,7 @@ impl TransportManager { next_connection_id: Arc::new(AtomicUsize::new(0usize)), connection_limits: limits::ConnectionLimits::new(connection_limits_config), opening_errors: HashMap::new(), + connection_middleware, }, handle, ) From 669134b0485786de47dc72f6f90e10a431f35e6d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:29:08 +0300 Subject: [PATCH 04/22] manager: Modify trait API to match transport manager Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 4f4818a6..ef821bae 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -40,17 +40,32 @@ pub trait ConnectionMiddleware { /// /// Returns an error if connection limits or policy constraints prevent /// establishing the connection. - fn check_connection(&mut self, peer: PeerId, endpoint: &Endpoint) -> crate::Result<()>; + /// + /// # Note + /// + /// This method is called before the connection is established. However, + /// the transport manager can decide to reject the connection even if this + /// method returns `Ok(())`. Therefore, the API makes no guarantees of + /// further calling [`Self::on_connection_established`]. + /// + /// Implementations should inspect the provided parameters. To avoid leaking + /// memory, the implementation should not store the connection ID or endpoint + /// at this point in time. + fn can_accept_connection(&mut self, peer: PeerId, endpoint: &Endpoint) -> crate::Result<()>; /// Registers a connection as established. /// /// This method will be called after a successful check using [`Self::check_connection`]. - fn register_connection(&mut self, peer: PeerId, endpoint: &Endpoint); + /// The peer ID and endpoint are provided to identify the connection and are identical + /// to the ones used in [`Self::can_accept_connection`]. + fn on_connection_established(&mut self, peer: PeerId, endpoint: &Endpoint); /// Deregisters a connection when it is closed. /// /// This method will be called after a [`Self::register_connection`] call. - fn deregister_connection(&mut self, peer: PeerId, endpoint: &Endpoint); + /// The connection ID corresponds the endpoint provided in the + /// [`Self::on_connection_established`] method. + fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId); } /// Configuration for the connection limits. From 1feca62f3162b23596018edcf62eb700ee18d4ca Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:29:26 +0300 Subject: [PATCH 05/22] manager: Use the provided API Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 00f7a1ba..67762a83 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -448,6 +448,14 @@ impl TransportManager { pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> { // Don't alter the peer state if there's no capacity to dial. let available_capacity = self.connection_limits.on_dial_address()?; + + let middleware_capacity = if let Some(middleware) = &mut self.connection_middleware { + middleware.outbound_capacity()? + } else { + usize::MAX + }; + let available_capacity = available_capacity.min(middleware_capacity); + // The available capacity is the maximum number of connections that can be established, // so we limit the number of parallel dials to the minimum of these values. let limit = available_capacity.min(self.max_parallel_dials); @@ -522,6 +530,10 @@ impl TransportManager { pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> { self.connection_limits.on_dial_address()?; + if let Some(middleware) = &mut self.connection_middleware { + middleware.outbound_capacity()?; + } + let address_record = AddressRecord::from_multiaddr(address) .ok_or(Error::AddressError(AddressError::PeerIdMissing))?; @@ -690,6 +702,11 @@ impl TransportManager { fn on_pending_incoming_connection(&mut self) -> crate::Result<()> { self.connection_limits.on_incoming()?; + + if let Some(middleware) = &mut self.connection_middleware { + middleware.check_inbound()?; + } + Ok(()) } @@ -703,6 +720,10 @@ impl TransportManager { self.connection_limits.on_connection_closed(connection_id); + if let Some(middleware) = &mut self.connection_middleware { + middleware.on_connection_closed(peer, connection_id); + } + let mut peers = self.peers.write(); let context = peers.entry(peer).or_insert_with(|| PeerContext::default()); @@ -788,6 +809,18 @@ impl TransportManager { ); return Ok(ConnectionEstablishedResult::Reject); } + if let Some(middleware) = &mut self.connection_middleware { + if let Err(error) = middleware.can_accept_connection(peer, endpoint) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?endpoint, + ?error, + "connection middleware rejected connection", + ); + return Ok(ConnectionEstablishedResult::Reject); + } + } let mut peers = self.peers.write(); let context = peers.entry(peer).or_insert_with(|| PeerContext::default()); @@ -810,6 +843,10 @@ impl TransportManager { self.connection_limits .accept_established_connection(endpoint.connection_id(), endpoint.is_listener()); + if let Some(middleware) = &mut self.connection_middleware { + middleware.on_connection_established(peer, endpoint); + } + // Cancel all pending dials if the connection was established. if let PeerState::Opening { connection_id, From f7b724a11d207268b886be4a7b7e4e5631f3c01a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:33:06 +0300 Subject: [PATCH 06/22] manager: Implement middleware for connection limits Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index ef821bae..c84133a9 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -206,6 +206,28 @@ impl ConnectionLimits { } } +impl ConnectionMiddleware for ConnectionLimits { + fn outbound_capacity(&mut self) -> crate::Result { + self.on_dial_address().map_err(Into::into) + } + + fn check_inbound(&mut self) -> crate::Result<()> { + self.on_incoming().map_err(Into::into) + } + + fn can_accept_connection(&mut self, _peer: PeerId, endpoint: &Endpoint) -> crate::Result<()> { + self.can_accept_connection(endpoint.is_listener()).map_err(Into::into) + } + + fn on_connection_established(&mut self, _peer: PeerId, endpoint: &Endpoint) { + self.accept_established_connection(endpoint.connection_id(), endpoint.is_listener()); + } + + fn on_connection_closed(&mut self, _peer: PeerId, connection_id: ConnectionId) { + self.on_connection_closed(connection_id); + } +} + #[cfg(test)] mod tests { use super::*; From 7d70ef2998a046d233adaaf65a88dcea828f1724 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:36:10 +0300 Subject: [PATCH 07/22] config: Remove old connection limits entirely Signed-off-by: Alexandru Vasile --- src/config.rs | 11 +---------- src/lib.rs | 1 - src/transport/manager/mod.rs | 29 +---------------------------- 3 files changed, 2 insertions(+), 39 deletions(-) diff --git a/src/config.rs b/src/config.rs index ad968b8f..c11281fd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -271,12 +271,7 @@ impl ConfigBuilder { self } - /// Set connection limits configuration. - pub fn with_connection_limits(mut self, config: ConnectionLimitsConfig) -> Self { - self.connection_limits = config; - self - } - + /// Set connection middleware. pub fn with_connection_middleware(mut self, middleware: Box) -> Self { self.connection_middleware = Some(middleware); self @@ -315,7 +310,6 @@ impl ConfigBuilder { notification_protocols: self.notification_protocols, request_response_protocols: self.request_response_protocols, known_addresses: self.known_addresses, - connection_limits: self.connection_limits, keep_alive_timeout: self.keep_alive_timeout, connection_middleware: self.connection_middleware.take(), } @@ -375,9 +369,6 @@ pub struct Litep2pConfig { /// Known addresses. pub(crate) known_addresses: Vec<(PeerId, Vec)>, - /// Connection limits config. - pub(crate) connection_limits: ConnectionLimitsConfig, - /// Close the connection if no substreams are open within this time frame. pub(crate) keep_alive_timeout: Duration, diff --git a/src/lib.rs b/src/lib.rs index 9314b312..c1f42d99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,7 +163,6 @@ impl Litep2p { supported_transports, bandwidth_sink.clone(), litep2p_config.max_parallel_dials, - litep2p_config.connection_limits, litep2p_config.connection_middleware, ); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 67762a83..aa3391c7 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -248,9 +248,6 @@ pub struct TransportManager { /// Pending connections. pending_connections: HashMap, - /// Connection limits. - connection_limits: limits::ConnectionLimits, - /// Opening connections errors. opening_errors: HashMap>, @@ -266,7 +263,6 @@ impl TransportManager { supported_transports: HashSet, bandwidth_sink: BandwidthSink, max_parallel_dials: usize, - connection_limits_config: limits::ConnectionLimitsConfig, connection_middleware: Option>, ) -> (Self, TransportManagerHandle) { let local_peer_id = PeerId::from_public_key(&keypair.public().into()); @@ -303,7 +299,6 @@ impl TransportManager { pending_connections: HashMap::new(), next_substream_id: Arc::new(AtomicUsize::new(0usize)), next_connection_id: Arc::new(AtomicUsize::new(0usize)), - connection_limits: limits::ConnectionLimits::new(connection_limits_config), opening_errors: HashMap::new(), connection_middleware, }, @@ -447,14 +442,11 @@ impl TransportManager { /// Returns an error if the peer is unknown or the peer is already connected. pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> { // Don't alter the peer state if there's no capacity to dial. - let available_capacity = self.connection_limits.on_dial_address()?; - - let middleware_capacity = if let Some(middleware) = &mut self.connection_middleware { + let available_capacity = if let Some(middleware) = &mut self.connection_middleware { middleware.outbound_capacity()? } else { usize::MAX }; - let available_capacity = available_capacity.min(middleware_capacity); // The available capacity is the maximum number of connections that can be established, // so we limit the number of parallel dials to the minimum of these values. @@ -528,8 +520,6 @@ impl TransportManager { /// /// Returns an error if address it not valid. pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> { - self.connection_limits.on_dial_address()?; - if let Some(middleware) = &mut self.connection_middleware { middleware.outbound_capacity()?; } @@ -701,8 +691,6 @@ impl TransportManager { } fn on_pending_incoming_connection(&mut self) -> crate::Result<()> { - self.connection_limits.on_incoming()?; - if let Some(middleware) = &mut self.connection_middleware { middleware.check_inbound()?; } @@ -718,8 +706,6 @@ impl TransportManager { ) -> Option { tracing::trace!(target: LOG_TARGET, ?peer, ?connection_id, "connection closed"); - self.connection_limits.on_connection_closed(connection_id); - if let Some(middleware) = &mut self.connection_middleware { middleware.on_connection_closed(peer, connection_id); } @@ -799,16 +785,6 @@ impl TransportManager { }; // Reject the connection if exceeded limits. - if let Err(error) = self.connection_limits.can_accept_connection(endpoint.is_listener()) { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?endpoint, - ?error, - "connection limit exceeded, rejecting connection", - ); - return Ok(ConnectionEstablishedResult::Reject); - } if let Some(middleware) = &mut self.connection_middleware { if let Err(error) = middleware.can_accept_connection(peer, endpoint) { tracing::debug!( @@ -840,9 +816,6 @@ impl TransportManager { ); if connection_accepted { - self.connection_limits - .accept_established_connection(endpoint.connection_id(), endpoint.is_listener()); - if let Some(middleware) = &mut self.connection_middleware { middleware.on_connection_established(peer, endpoint); } From 0aac72c180686c1c078f13e38ebc1324fa945706 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:40:28 +0300 Subject: [PATCH 08/22] manager: Make connection limts inner functions private Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index c84133a9..71611786 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -100,7 +100,10 @@ pub enum ConnectionLimitsError { MaxOutgoingConnectionsExceeded, } -/// Connection limits. +/// General connection limits. +/// +/// This is a type of connection middleware that places limits on the number +/// of incoming and outgoing connections. #[derive(Debug, Clone)] pub struct ConnectionLimits { /// Configuration for the connection limits. @@ -133,7 +136,7 @@ impl ConnectionLimits { /// single address. /// /// If the maximum number of outgoing connections is not set, `Ok(usize::MAX)` is returned. - pub fn on_dial_address(&mut self) -> Result { + fn on_dial_address(&mut self) -> Result { if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { if self.outgoing_connections.len() >= max_outgoing_connections { return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); @@ -146,7 +149,7 @@ impl ConnectionLimits { } /// Called before accepting a new incoming connection. - pub fn on_incoming(&mut self) -> Result<(), ConnectionLimitsError> { + fn on_incoming(&mut self) -> Result<(), ConnectionLimitsError> { if let Some(max_incoming_connections) = self.config.max_incoming_connections { if self.incoming_connections.len() >= max_incoming_connections { return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); @@ -159,10 +162,7 @@ impl ConnectionLimits { /// Called when a new connection is established. /// /// Returns an error if the connection cannot be accepted due to connection limits. - pub fn can_accept_connection( - &mut self, - is_listener: bool, - ) -> Result<(), ConnectionLimitsError> { + fn can_accept_connection(&mut self, is_listener: bool) -> Result<(), ConnectionLimitsError> { // Check connection limits. if is_listener { if let Some(max_incoming_connections) = self.config.max_incoming_connections { @@ -185,11 +185,7 @@ impl ConnectionLimits { /// /// This method should be called after the `Self::can_accept_connection` method /// to ensure that the connection can be accepted. - pub fn accept_established_connection( - &mut self, - connection_id: ConnectionId, - is_listener: bool, - ) { + fn accept_established_connection(&mut self, connection_id: ConnectionId, is_listener: bool) { if is_listener { if self.config.max_incoming_connections.is_some() { self.incoming_connections.insert(connection_id); @@ -200,7 +196,7 @@ impl ConnectionLimits { } /// Called when a connection is closed. - pub fn on_connection_closed(&mut self, connection_id: ConnectionId) { + fn on_connection_closed(&mut self, connection_id: ConnectionId) { self.incoming_connections.remove(&connection_id); self.outgoing_connections.remove(&connection_id); } From 942981e900af4b99e18f1f43545d471ffce63c75 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:44:42 +0300 Subject: [PATCH 09/22] manager: Expose ConnectionLimits struct to users Signed-off-by: Alexandru Vasile --- src/config.rs | 7 +--- src/transport/manager/limits.rs | 69 +++++++-------------------------- src/transport/mod.rs | 4 +- 3 files changed, 19 insertions(+), 61 deletions(-) diff --git a/src/config.rs b/src/config.rs index c11281fd..8cd9ff98 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,8 +29,7 @@ use crate::{ notification, request_response, UserProtocol, }, transport::{ - manager::limits::{ConnectionLimitsConfig, ConnectionMiddleware}, - tcp::config::Config as TcpConfig, + manager::limits::ConnectionMiddleware, tcp::config::Config as TcpConfig, KEEP_ALIVE_TIMEOUT, MAX_PARALLEL_DIALS, }, types::protocol::ProtocolName, @@ -120,9 +119,6 @@ pub struct ConfigBuilder { /// Maximum number of parallel dial attempts. max_parallel_dials: usize, - /// Connection limits config. - connection_limits: ConnectionLimitsConfig, - /// Close the connection if no substreams are open within this time frame. keep_alive_timeout: Duration, @@ -159,7 +155,6 @@ impl ConfigBuilder { notification_protocols: HashMap::new(), request_response_protocols: HashMap::new(), known_addresses: Vec::new(), - connection_limits: ConnectionLimitsConfig::default(), keep_alive_timeout: KEEP_ALIVE_TIMEOUT, connection_middleware: None, } diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 71611786..f1829779 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -127,19 +127,13 @@ impl ConnectionLimits { outgoing_connections: HashSet::with_capacity(max_outgoing_connections), } } +} - /// Called when dialing an address. - /// - /// Returns the number of outgoing connections permitted to be established. - /// It is guaranteed that at least one connection can be established if the method returns `Ok`. - /// The number of available outgoing connections can influence the maximum parallel dials to a - /// single address. - /// - /// If the maximum number of outgoing connections is not set, `Ok(usize::MAX)` is returned. - fn on_dial_address(&mut self) -> Result { +impl ConnectionMiddleware for ConnectionLimits { + fn outbound_capacity(&mut self) -> crate::Result { if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { if self.outgoing_connections.len() >= max_outgoing_connections { - return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); + return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded.into()); } return Ok(max_outgoing_connections - self.outgoing_connections.len()); @@ -148,82 +142,49 @@ impl ConnectionLimits { Ok(usize::MAX) } - /// Called before accepting a new incoming connection. - fn on_incoming(&mut self) -> Result<(), ConnectionLimitsError> { + fn check_inbound(&mut self) -> crate::Result<()> { if let Some(max_incoming_connections) = self.config.max_incoming_connections { if self.incoming_connections.len() >= max_incoming_connections { - return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); + return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded.into()); } } Ok(()) } - /// Called when a new connection is established. - /// - /// Returns an error if the connection cannot be accepted due to connection limits. - fn can_accept_connection(&mut self, is_listener: bool) -> Result<(), ConnectionLimitsError> { + fn can_accept_connection(&mut self, _peer: PeerId, endpoint: &Endpoint) -> crate::Result<()> { // Check connection limits. - if is_listener { + if endpoint.is_listener() { if let Some(max_incoming_connections) = self.config.max_incoming_connections { if self.incoming_connections.len() >= max_incoming_connections { - return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); + return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded.into()); } } } else if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { if self.outgoing_connections.len() >= max_outgoing_connections { - return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); + return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded.into()); } } Ok(()) } - /// Accept an established connection. - /// - /// # Note - /// - /// This method should be called after the `Self::can_accept_connection` method - /// to ensure that the connection can be accepted. - fn accept_established_connection(&mut self, connection_id: ConnectionId, is_listener: bool) { - if is_listener { + fn on_connection_established(&mut self, _peer: PeerId, endpoint: &Endpoint) { + if endpoint.is_listener() { if self.config.max_incoming_connections.is_some() { - self.incoming_connections.insert(connection_id); + self.incoming_connections.insert(endpoint.connection_id()); } } else if self.config.max_outgoing_connections.is_some() { - self.outgoing_connections.insert(connection_id); + self.outgoing_connections.insert(endpoint.connection_id()); } } - /// Called when a connection is closed. - fn on_connection_closed(&mut self, connection_id: ConnectionId) { + fn on_connection_closed(&mut self, _peer: PeerId, connection_id: ConnectionId) { self.incoming_connections.remove(&connection_id); self.outgoing_connections.remove(&connection_id); } } -impl ConnectionMiddleware for ConnectionLimits { - fn outbound_capacity(&mut self) -> crate::Result { - self.on_dial_address().map_err(Into::into) - } - - fn check_inbound(&mut self) -> crate::Result<()> { - self.on_incoming().map_err(Into::into) - } - - fn can_accept_connection(&mut self, _peer: PeerId, endpoint: &Endpoint) -> crate::Result<()> { - self.can_accept_connection(endpoint.is_listener()).map_err(Into::into) - } - - fn on_connection_established(&mut self, _peer: PeerId, endpoint: &Endpoint) { - self.accept_established_connection(endpoint.connection_id(), endpoint.is_listener()); - } - - fn on_connection_closed(&mut self, _peer: PeerId, connection_id: ConnectionId) { - self.on_connection_closed(connection_id); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 1bf9b7d9..e17a9ff7 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -39,7 +39,9 @@ pub mod websocket; pub(crate) mod dummy; pub(crate) mod manager; -pub use manager::limits::{ConnectionLimitsConfig, ConnectionLimitsError}; +pub use manager::limits::{ + ConnectionLimits, ConnectionLimitsConfig, ConnectionLimitsError, ConnectionMiddleware, +}; /// Timeout for opening a connection. pub(crate) const CONNECTION_OPEN_TIMEOUT: Duration = Duration::from_secs(10); From 3493398ff8f368756b7397972ce51ffa49f8dbe4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:49:49 +0300 Subject: [PATCH 10/22] manager: Add more docs and impl unusual methods Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index f1829779..037aa012 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -25,16 +25,29 @@ use crate::{transport::Endpoint, types::ConnectionId, PeerId}; use std::collections::HashSet; /// A middleware trait for implementing connection limits. +/// +/// The middleware interacts with the transport manager at two entry points: +/// - before the connection is negotiated via [`Self::outbound_capacity`] and +/// [`Self::check_inbound`] +/// - after the connection is established via [`Self::can_accept_connection`], +/// [`Self::on_connection_established`] and [`Self::on_connection_closed`]. +/// +/// Returning an error from any of the methods will prevent the connection from being +/// accepted by the transport manager. pub trait ConnectionMiddleware { /// Determines the number of outbound connections permitted to be established. /// - /// Returns the number of allowed outbound connections. If there is no limit, - /// returns `Ok(usize::MAX)`. If the node cannot accept any more outbound - /// connections, returns an error. - fn outbound_capacity(&mut self) -> crate::Result; + /// Returns the number of allowed outbound connections. + /// If there is no limit, returns `Ok(usize::MAX)`. + /// If the node cannot accept any more outbound connections, returns an error. + fn outbound_capacity(&mut self) -> crate::Result { + Ok(usize::MAX) + } /// Checks whether a new inbound connection can be accepted before processing it. - fn check_inbound(&mut self) -> crate::Result<()>; + fn check_inbound(&mut self) -> crate::Result<()> { + Ok(()) + } /// Verifies if a new connection (inbound or outbound) can be established. /// From 37ffd6ab33fa671b2eccbe181c76b686413c7618 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 16:57:13 +0300 Subject: [PATCH 11/22] manager/tests: Adjust testing to the new interface Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 50 +++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 037aa012..35352a0a 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -217,46 +217,66 @@ mod tests { let connection_id_in_3 = ConnectionId::random(); // Establish incoming connection. - assert!(limits.can_accept_connection(true).is_ok()); - limits.accept_established_connection(connection_id_in_1, true); + let endpoint = Endpoint::Listener { + address: Default::default(), + connection_id: connection_id_in_1, + }; + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); + limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 1); - assert!(limits.can_accept_connection(true).is_ok()); - limits.accept_established_connection(connection_id_in_2, true); + let endpoint = Endpoint::Listener { + address: Default::default(), + connection_id: connection_id_in_2, + }; + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); + limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 2); - assert!(limits.can_accept_connection(true).is_ok()); - limits.accept_established_connection(connection_id_in_3, true); + let endpoint = Endpoint::Listener { + address: Default::default(), + connection_id: connection_id_in_3, + }; + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); + limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!( - limits.can_accept_connection(true).unwrap_err(), + limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), ConnectionLimitsError::MaxIncomingConnectionsExceeded ); assert_eq!(limits.incoming_connections.len(), 3); // Establish outgoing connection. - assert!(limits.can_accept_connection(false).is_ok()); - limits.accept_established_connection(connection_id_out_1, false); + let endpoint = Endpoint::Dialer { + address: Default::default(), + connection_id: connection_id_out_1, + }; + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); + limits.accept_established_connection(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 1); - assert!(limits.can_accept_connection(false).is_ok()); - limits.accept_established_connection(connection_id_out_2, false); + let endpoint = Endpoint::Dialer { + address: Default::default(), + connection_id: connection_id_out_2, + }; + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); + limits.accept_established_connection(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 2); assert_eq!( - limits.can_accept_connection(false).unwrap_err(), + limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), ConnectionLimitsError::MaxOutgoingConnectionsExceeded ); - // Close connections with peer a. - limits.on_connection_closed(connection_id_in_1); + // Close connections with 1. + limits.on_connection_closed(PeerId::random(), connection_id_in_1); assert_eq!(limits.incoming_connections.len(), 2); assert_eq!(limits.outgoing_connections.len(), 2); - limits.on_connection_closed(connection_id_out_1); + limits.on_connection_closed(PeerId::random(), connection_id_out_1); assert_eq!(limits.incoming_connections.len(), 2); assert_eq!(limits.outgoing_connections.len(), 1); } From fdda69a44b60e4dbe4eae51f66c9b69155a98eef Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Apr 2025 17:17:36 +0300 Subject: [PATCH 12/22] tests: Adjust testing Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 4 +- src/protocol/mdns.rs | 9 ++- src/protocol/notification/tests/mod.rs | 4 +- src/protocol/request_response/tests.rs | 4 +- src/transport/manager/limits.rs | 34 ++++----- src/transport/manager/mod.rs | 96 ++++++++++++++------------ src/transport/tcp/mod.rs | 2 +- 7 files changed, 81 insertions(+), 72 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index aa5c20f4..17cf9180 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1232,7 +1232,7 @@ mod tests { crypto::ed25519::Keypair, transport::{ manager::{limits::ConnectionLimitsConfig, TransportManager}, - KEEP_ALIVE_TIMEOUT, + ConnectionLimits, KEEP_ALIVE_TIMEOUT, }, types::protocol::ProtocolName, BandwidthSink, @@ -1251,7 +1251,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index 7ea8f30d..eb6d83bb 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -336,7 +336,10 @@ mod tests { use super::*; use crate::{ crypto::ed25519::Keypair, - transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + transport::{ + manager::{limits::ConnectionLimitsConfig, TransportManager}, + ConnectionLimits, + }, BandwidthSink, }; use futures::StreamExt; @@ -354,7 +357,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let mdns1 = Mdns::new( @@ -377,7 +380,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let mdns2 = Mdns::new( diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index 4aa48aa4..d8279105 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -31,7 +31,7 @@ use crate::{ }, transport::{ manager::{limits::ConnectionLimitsConfig, TransportManager}, - KEEP_ALIVE_TIMEOUT, + ConnectionLimits, KEEP_ALIVE_TIMEOUT, }, types::protocol::ProtocolName, BandwidthSink, PeerId, @@ -56,7 +56,7 @@ fn make_notification_protocol() -> ( HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 32cc65e7..6d0d58f6 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -31,7 +31,7 @@ use crate::{ substream::Substream, transport::{ manager::{limits::ConnectionLimitsConfig, TransportManager}, - KEEP_ALIVE_TIMEOUT, + ConnectionLimits, KEEP_ALIVE_TIMEOUT, }, types::{RequestId, SubstreamId}, BandwidthSink, Error, PeerId, ProtocolName, @@ -54,7 +54,7 @@ fn protocol() -> ( HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 35352a0a..2ab4bc2d 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -34,7 +34,7 @@ use std::collections::HashSet; /// /// Returning an error from any of the methods will prevent the connection from being /// accepted by the transport manager. -pub trait ConnectionMiddleware { +pub trait ConnectionMiddleware: Send { /// Determines the number of outbound connections permitted to be established. /// /// Returns the number of allowed outbound connections. @@ -218,7 +218,7 @@ mod tests { // Establish incoming connection. let endpoint = Endpoint::Listener { - address: Default::default(), + address: multiaddr::Multiaddr::empty(), connection_id: connection_id_in_1, }; assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); @@ -226,7 +226,7 @@ mod tests { assert_eq!(limits.incoming_connections.len(), 1); let endpoint = Endpoint::Listener { - address: Default::default(), + address: multiaddr::Multiaddr::empty(), connection_id: connection_id_in_2, }; assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); @@ -234,42 +234,44 @@ mod tests { assert_eq!(limits.incoming_connections.len(), 2); let endpoint = Endpoint::Listener { - address: Default::default(), + address: multiaddr::Multiaddr::empty(), connection_id: connection_id_in_3, }; assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); - assert_eq!( - limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), - ConnectionLimitsError::MaxIncomingConnectionsExceeded - ); + let err = limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(); + // assert_eq!( + // limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), + // ConnectionLimitsError::MaxIncomingConnectionsExceeded.into(), + // ); assert_eq!(limits.incoming_connections.len(), 3); // Establish outgoing connection. let endpoint = Endpoint::Dialer { - address: Default::default(), + address: multiaddr::Multiaddr::empty(), connection_id: connection_id_out_1, }; assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); - limits.accept_established_connection(PeerId::random(), &endpoint); + limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 1); let endpoint = Endpoint::Dialer { - address: Default::default(), + address: multiaddr::Multiaddr::empty(), connection_id: connection_id_out_2, }; assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_ok()); - limits.accept_established_connection(PeerId::random(), &endpoint); + limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 2); - assert_eq!( - limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), - ConnectionLimitsError::MaxOutgoingConnectionsExceeded - ); + let err = limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(); + // assert_eq!( + // limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), + // ConnectionLimitsError::MaxOutgoingConnectionsExceeded.into(), + // ); // Close connections with 1. limits.on_connection_closed(PeerId::random(), connection_id_in_1); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index aa3391c7..a3c89ae1 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1334,7 +1334,7 @@ impl TransportManager { #[cfg(test)] mod tests { use crate::transport::manager::{address::AddressStore, peer_state::SecondaryOrDialing}; - use limits::ConnectionLimitsConfig; + use limits::{ConnectionLimits, ConnectionLimitsConfig}; use multihash::Multihash; @@ -1521,7 +1521,7 @@ mod tests { HashSet::new(), sink, 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_protocol( @@ -1548,7 +1548,7 @@ mod tests { HashSet::new(), sink, 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_protocol( @@ -1578,7 +1578,7 @@ mod tests { HashSet::new(), sink, 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_protocol( @@ -1611,7 +1611,7 @@ mod tests { HashSet::new(), sink, 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1628,7 +1628,7 @@ mod tests { HashSet::new(), sink, 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); assert!(manager.dial(local_peer_id).await.is_err()); @@ -1641,7 +1641,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1671,7 +1671,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -1733,7 +1733,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1764,7 +1764,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1809,7 +1809,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1828,7 +1828,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1861,7 +1861,7 @@ mod tests { transports, BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); // ipv6 @@ -1923,7 +1923,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1990,7 +1990,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2077,7 +2077,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2162,7 +2162,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2271,7 +2271,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2367,7 +2367,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2476,7 +2476,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2580,7 +2580,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2724,7 +2724,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.on_dial_failure(ConnectionId::random()).unwrap(); @@ -2743,7 +2743,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap(); } @@ -2761,7 +2761,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager .on_connection_opened( @@ -2785,7 +2785,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2809,7 +2809,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2836,7 +2836,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager @@ -2857,7 +2857,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2877,7 +2877,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); assert!(manager.next().await.is_none()); @@ -2890,7 +2890,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = { @@ -2938,7 +2938,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = { @@ -3001,7 +3001,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = { @@ -3044,7 +3044,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); // transport doesn't start with ip/dns @@ -3110,7 +3110,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); async fn call_manager(manager: &mut TransportManager, address: Multiaddr) { @@ -3164,7 +3164,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -3250,7 +3250,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -3338,9 +3338,11 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default() - .max_incoming_connections(Some(3)) - .max_outgoing_connections(Some(2)), + Some(Box::new(ConnectionLimits::new( + ConnectionLimitsConfig::default() + .max_incoming_connections(Some(3)) + .max_outgoing_connections(Some(2)), + ))), ); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3414,9 +3416,11 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default() - .max_incoming_connections(Some(3)) - .max_outgoing_connections(Some(2)), + Some(Box::new(ConnectionLimits::new( + ConnectionLimitsConfig::default() + .max_incoming_connections(Some(3)) + .max_outgoing_connections(Some(2)), + ))), ); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3503,7 +3507,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3556,7 +3560,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3708,7 +3712,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -3794,7 +3798,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + Some(Box::new(ConnectionLimits::new(Default::default()))), ); let peer = PeerId::random(); let connection_id = ConnectionId::from(0); diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 748e138d..8b074fc3 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -979,7 +979,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, - ConnectionLimitsConfig::default(), + None, ); let handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport( From e8fb1bd01f98a43af52a48206abaa9f76e94058a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:02:47 +0300 Subject: [PATCH 13/22] manager: Expose connection id to the inbound check step Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 10 +++------- src/transport/manager/mod.rs | 6 +++--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 2ab4bc2d..47220c44 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -40,14 +40,10 @@ pub trait ConnectionMiddleware: Send { /// Returns the number of allowed outbound connections. /// If there is no limit, returns `Ok(usize::MAX)`. /// If the node cannot accept any more outbound connections, returns an error. - fn outbound_capacity(&mut self) -> crate::Result { - Ok(usize::MAX) - } + fn outbound_capacity(&mut self) -> crate::Result; /// Checks whether a new inbound connection can be accepted before processing it. - fn check_inbound(&mut self) -> crate::Result<()> { - Ok(()) - } + fn check_inbound(&mut self, connection_id: ConnectionId) -> crate::Result<()>; /// Verifies if a new connection (inbound or outbound) can be established. /// @@ -155,7 +151,7 @@ impl ConnectionMiddleware for ConnectionLimits { Ok(usize::MAX) } - fn check_inbound(&mut self) -> crate::Result<()> { + fn check_inbound(&mut self, _connection_id: ConnectionId) -> crate::Result<()> { if let Some(max_incoming_connections) = self.config.max_incoming_connections { if self.incoming_connections.len() >= max_incoming_connections { return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded.into()); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index a3c89ae1..b4843a90 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -690,9 +690,9 @@ impl TransportManager { Ok(()) } - fn on_pending_incoming_connection(&mut self) -> crate::Result<()> { + fn on_pending_incoming_connection(&mut self, connection_id: ConnectionId) -> crate::Result<()> { if let Some(middleware) = &mut self.connection_middleware { - middleware.check_inbound()?; + middleware.check_inbound(connection_id)?; } Ok(()) @@ -1297,7 +1297,7 @@ impl TransportManager { } }, TransportEvent::PendingInboundConnection { connection_id } => { - if self.on_pending_incoming_connection().is_ok() { + if self.on_pending_incoming_connection(connection_id).is_ok() { tracing::trace!( target: LOG_TARGET, ?connection_id, From 5e04ddfb13f56b9e83f365fccbfb7ca7fce05fd7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:10:31 +0300 Subject: [PATCH 14/22] manager: Open API to capture peer IDs before dialing Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 6 ++++-- src/transport/manager/mod.rs | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 47220c44..2721c53b 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -37,10 +37,12 @@ use std::collections::HashSet; pub trait ConnectionMiddleware: Send { /// Determines the number of outbound connections permitted to be established. /// + /// This method is called before the node attempts to dial a remote peer. + /// /// Returns the number of allowed outbound connections. /// If there is no limit, returns `Ok(usize::MAX)`. /// If the node cannot accept any more outbound connections, returns an error. - fn outbound_capacity(&mut self) -> crate::Result; + fn outbound_capacity(&mut self, peer: PeerId) -> crate::Result; /// Checks whether a new inbound connection can be accepted before processing it. fn check_inbound(&mut self, connection_id: ConnectionId) -> crate::Result<()>; @@ -139,7 +141,7 @@ impl ConnectionLimits { } impl ConnectionMiddleware for ConnectionLimits { - fn outbound_capacity(&mut self) -> crate::Result { + fn outbound_capacity(&mut self, _peer: PeerId) -> crate::Result { if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { if self.outgoing_connections.len() >= max_outgoing_connections { return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded.into()); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index b4843a90..c486d40c 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -443,7 +443,7 @@ impl TransportManager { pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> { // Don't alter the peer state if there's no capacity to dial. let available_capacity = if let Some(middleware) = &mut self.connection_middleware { - middleware.outbound_capacity()? + middleware.outbound_capacity(peer)? } else { usize::MAX }; @@ -521,7 +521,10 @@ impl TransportManager { /// Returns an error if address it not valid. pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> { if let Some(middleware) = &mut self.connection_middleware { - middleware.outbound_capacity()?; + let peer = PeerId::try_from_multiaddr(&address) + .ok_or(Error::AddressError(AddressError::PeerIdMissing))?; + + middleware.outbound_capacity(peer)?; } let address_record = AddressRecord::from_multiaddr(address) From 4a56968ce9efbc05077c876c392370113b3e6642 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:11:29 +0300 Subject: [PATCH 15/22] manager: Enhance docs Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 2721c53b..40675ef2 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -45,6 +45,10 @@ pub trait ConnectionMiddleware: Send { fn outbound_capacity(&mut self, peer: PeerId) -> crate::Result; /// Checks whether a new inbound connection can be accepted before processing it. + /// + /// At this point, no protocol negotiation has occurred and the peer identity is + /// unknown. The connection ID provided is the one that will be used for the + /// connection. fn check_inbound(&mut self, connection_id: ConnectionId) -> crate::Result<()>; /// Verifies if a new connection (inbound or outbound) can be established. From 30bd3ea2bf658f8e861643e44f98bcb7ca30f233 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:18:32 +0300 Subject: [PATCH 16/22] transport: Refactor `PendingInboundConnection` event with socket addr Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 2 +- src/transport/mod.rs | 5 ++++- src/transport/quic/mod.rs | 1 + src/transport/tcp/mod.rs | 1 + src/transport/websocket/mod.rs | 1 + 5 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index c486d40c..09da7e78 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1299,7 +1299,7 @@ impl TransportManager { } } }, - TransportEvent::PendingInboundConnection { connection_id } => { + TransportEvent::PendingInboundConnection { connection_id, .. } => { if self.on_pending_incoming_connection(connection_id).is_ok() { tracing::trace!( target: LOG_TARGET, diff --git a/src/transport/mod.rs b/src/transport/mod.rs index e17a9ff7..d71cb258 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -25,7 +25,7 @@ use crate::{error::DialError, transport::manager::TransportHandle, types::Connec use futures::Stream; use multiaddr::Multiaddr; -use std::{fmt::Debug, time::Duration}; +use std::{fmt::Debug, net::SocketAddr, time::Duration}; pub(crate) mod common; #[cfg(feature = "quic")] @@ -131,6 +131,9 @@ pub(crate) enum TransportEvent { PendingInboundConnection { /// Connection ID. connection_id: ConnectionId, + + /// The socket address which initiated the connection. + address: SocketAddr, }, /// Connection opened to remote but not yet negotiated. diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 0cf5e255..0e9cdf60 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -501,6 +501,7 @@ impl Stream for QuicTransport { return Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), })); } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 8b074fc3..49fd599c 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -579,6 +579,7 @@ impl Stream for TcpTransport { Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, + address, })) } }; diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 2435f639..d65a0106 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -625,6 +625,7 @@ impl Stream for WebSocketTransport { Poll::Ready(Some(TransportEvent::PendingInboundConnection { connection_id, + address, })) } }; From f8c36a3d3d2922f0891dff1a758e5c4c21742956 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:20:55 +0300 Subject: [PATCH 17/22] manager: Enhance API with pure socket address Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 14 +++++++++++--- src/transport/manager/mod.rs | 13 +++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 40675ef2..ce140526 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -22,7 +22,7 @@ use crate::{transport::Endpoint, types::ConnectionId, PeerId}; -use std::collections::HashSet; +use std::{collections::HashSet, net::SocketAddr}; /// A middleware trait for implementing connection limits. /// @@ -49,7 +49,11 @@ pub trait ConnectionMiddleware: Send { /// At this point, no protocol negotiation has occurred and the peer identity is /// unknown. The connection ID provided is the one that will be used for the /// connection. - fn check_inbound(&mut self, connection_id: ConnectionId) -> crate::Result<()>; + fn check_inbound( + &mut self, + connection_id: ConnectionId, + address: SocketAddr, + ) -> crate::Result<()>; /// Verifies if a new connection (inbound or outbound) can be established. /// @@ -157,7 +161,11 @@ impl ConnectionMiddleware for ConnectionLimits { Ok(usize::MAX) } - fn check_inbound(&mut self, _connection_id: ConnectionId) -> crate::Result<()> { + fn check_inbound( + &mut self, + _connection_id: ConnectionId, + _address: SocketAddr, + ) -> crate::Result<()> { if let Some(max_incoming_connections) = self.config.max_incoming_connections { if self.incoming_connections.len() >= max_incoming_connections { return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded.into()); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 09da7e78..f00f9899 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -49,6 +49,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::{ collections::{HashMap, HashSet}, + net::SocketAddr, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -693,9 +694,13 @@ impl TransportManager { Ok(()) } - fn on_pending_incoming_connection(&mut self, connection_id: ConnectionId) -> crate::Result<()> { + fn on_pending_incoming_connection( + &mut self, + connection_id: ConnectionId, + address: SocketAddr, + ) -> crate::Result<()> { if let Some(middleware) = &mut self.connection_middleware { - middleware.check_inbound(connection_id)?; + middleware.check_inbound(connection_id, address)?; } Ok(()) @@ -1299,8 +1304,8 @@ impl TransportManager { } } }, - TransportEvent::PendingInboundConnection { connection_id, .. } => { - if self.on_pending_incoming_connection(connection_id).is_ok() { + TransportEvent::PendingInboundConnection { connection_id, address } => { + if self.on_pending_incoming_connection(connection_id, address).is_ok() { tracing::trace!( target: LOG_TARGET, ?connection_id, From 4754dc190b1c4406b32cc6fcfcad4739f1825277 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 15:24:28 +0300 Subject: [PATCH 18/22] tests: Adjust testing Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 5 +++++ src/transport/quic/mod.rs | 2 +- src/transport/tcp/mod.rs | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index f00f9899..20144699 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1345,6 +1345,7 @@ mod tests { use limits::{ConnectionLimits, ConnectionLimitsConfig}; use multihash::Multihash; + use std::net::IpAddr; use super::*; use crate::{ @@ -1452,6 +1453,7 @@ mod tests { tx_ws .send(TransportEvent::PendingInboundConnection { connection_id: ConnectionId::from(1), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), }) .await .expect("chanel to be open"); @@ -1470,6 +1472,7 @@ mod tests { tx_tcp .send(TransportEvent::PendingInboundConnection { connection_id: ConnectionId::from(2), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), }) .await .expect("chanel to be open"); @@ -1488,12 +1491,14 @@ mod tests { tx_ws .send(TransportEvent::PendingInboundConnection { connection_id: ConnectionId::from(3), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), }) .await .expect("chanel to be open"); tx_tcp .send(TransportEvent::PendingInboundConnection { connection_id: ConnectionId::from(4), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), }) .await .expect("chanel to be open"); diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 0e9cdf60..32c75c2f 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -660,7 +660,7 @@ mod tests { let event = transport1.next().await.unwrap(); match event { - TransportEvent::PendingInboundConnection { connection_id } => { + TransportEvent::PendingInboundConnection { connection_id, .. } => { transport1.accept_pending(connection_id).unwrap(); } _ => panic!("unexpected event"), diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 49fd599c..499b3111 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -770,7 +770,7 @@ mod tests { let event = transport1.next().await.unwrap(); match event { - TransportEvent::PendingInboundConnection { connection_id } => { + TransportEvent::PendingInboundConnection { connection_id, .. } => { transport1.accept_pending(connection_id).unwrap(); } _ => panic!("unexpected event"), @@ -864,7 +864,7 @@ mod tests { // Reject connection. let event = transport1.next().await.unwrap(); match event { - TransportEvent::PendingInboundConnection { connection_id } => { + TransportEvent::PendingInboundConnection { connection_id, .. } => { transport1.reject_pending(connection_id).unwrap(); } _ => panic!("unexpected event"), From c65cccdfddac79e4288533c899ce6fd4325d44ec Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 18:01:48 +0300 Subject: [PATCH 19/22] manager: Update docs Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 48 ++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index ce140526..d170d27a 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -24,24 +24,52 @@ use crate::{transport::Endpoint, types::ConnectionId, PeerId}; use std::{collections::HashSet, net::SocketAddr}; -/// A middleware trait for implementing connection limits. +/// A middleware trait for managing connections. /// -/// The middleware interacts with the transport manager at two entry points: -/// - before the connection is negotiated via [`Self::outbound_capacity`] and -/// [`Self::check_inbound`] -/// - after the connection is established via [`Self::can_accept_connection`], -/// [`Self::on_connection_established`] and [`Self::on_connection_closed`]. +/// This middleware allows developers to implement custom connection policies, +/// enabling a wide range of use cases by exposing hooks into the connection lifecycle. /// -/// Returning an error from any of the methods will prevent the connection from being -/// accepted by the transport manager. +/// It interacts with the transport manager at two stages: +/// +/// ## 1. Before Negotiation +/// +/// At this stage, the connection has not yet been negotiated. In the context of litep2p, +/// "negotiation" refers to the handshake and setup of `crypto/noise` (encryption and peer ID +/// validation) and `yamux` (multiplexing). +/// +/// The node is either attempting to establish an outbound connection or accept an inbound one. +/// +/// - Returning an error here will prevent the negotiation from proceeding, saving resources. +/// +/// - [`Self::outbound_capacity`] is called to determine the number of outbound +/// connections that can be established. The peerID is provided to further provide connection +/// details. +/// +/// - [`Self::check_inbound`] is called to evaluate whether an inbound connection can be accepted. +/// The peer ID is not yet known, but the socket address is provided to identify the connection. +/// +/// ## 2. After Negotiation +/// +/// At this point, the connection has been successfully negotiated and the peer ID is known. +/// +/// - [`Self::can_accept_connection`] is invoked to determine if the fully negotiated connection +/// should be accepted. The peer ID, endpoint, and connection ID are provided. Implementations +/// should check internal limits but **must not** store the connection ID or endpoint here, as the +/// transport manager might still reject the connection later. +/// +/// - If the connection is accepted, [`Self::on_connection_established`] is called with the same +/// peer ID and endpoint. At this point, implementations should begin tracking the connection ID. +/// +/// - When a connection is closed, [`Self::on_connection_closed`] is called. Implementations must +/// clean up any resources associated with the connection ID to prevent memory leaks. pub trait ConnectionMiddleware: Send { /// Determines the number of outbound connections permitted to be established. /// /// This method is called before the node attempts to dial a remote peer. /// /// Returns the number of allowed outbound connections. - /// If there is no limit, returns `Ok(usize::MAX)`. - /// If the node cannot accept any more outbound connections, returns an error. + /// - If there is no limit, returns `Ok(usize::MAX)`. + /// - If the node cannot accept any more outbound connections, returns an error. fn outbound_capacity(&mut self, peer: PeerId) -> crate::Result; /// Checks whether a new inbound connection can be accepted before processing it. From bfccff6f1b53770cfd3b5a6728cd78ea59e1c19a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 18:04:13 +0300 Subject: [PATCH 20/22] tests: Fix import warnings and identify decode Signed-off-by: Alexandru Vasile --- src/codec/identity.rs | 2 ++ src/protocol/libp2p/kademlia/mod.rs | 5 +---- src/protocol/mdns.rs | 5 +---- src/protocol/notification/tests/mod.rs | 5 +---- src/protocol/request_response/tests.rs | 5 +---- src/transport/manager/limits.rs | 13 ++----------- src/transport/tcp/mod.rs | 4 +--- 7 files changed, 9 insertions(+), 30 deletions(-) diff --git a/src/codec/identity.rs b/src/codec/identity.rs index 92ea7916..7bd91ca5 100644 --- a/src/codec/identity.rs +++ b/src/codec/identity.rs @@ -100,9 +100,11 @@ mod tests { fn decoding_smaller_payloads() { let mut codec = Identity::new(100); let bytes = vec![3u8; 64]; + let copy = bytes.clone(); let mut bytes = BytesMut::from(&bytes[..]); let decoded = codec.decode(&mut bytes); + assert_eq!(decoded, copy); } #[test] diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 17cf9180..e993aeac 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1230,10 +1230,7 @@ mod tests { use crate::{ codec::ProtocolCodec, crypto::ed25519::Keypair, - transport::{ - manager::{limits::ConnectionLimitsConfig, TransportManager}, - ConnectionLimits, KEEP_ALIVE_TIMEOUT, - }, + transport::{manager::TransportManager, ConnectionLimits, KEEP_ALIVE_TIMEOUT}, types::protocol::ProtocolName, BandwidthSink, }; diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index eb6d83bb..53538fae 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -336,10 +336,7 @@ mod tests { use super::*; use crate::{ crypto::ed25519::Keypair, - transport::{ - manager::{limits::ConnectionLimitsConfig, TransportManager}, - ConnectionLimits, - }, + transport::{manager::TransportManager, ConnectionLimits}, BandwidthSink, }; use futures::StreamExt; diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index d8279105..5b40dd3b 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -29,10 +29,7 @@ use crate::{ }, InnerTransportEvent, ProtocolCommand, TransportService, }, - transport::{ - manager::{limits::ConnectionLimitsConfig, TransportManager}, - ConnectionLimits, KEEP_ALIVE_TIMEOUT, - }, + transport::{manager::TransportManager, ConnectionLimits, KEEP_ALIVE_TIMEOUT}, types::protocol::ProtocolName, BandwidthSink, PeerId, }; diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 6d0d58f6..522269bb 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -29,10 +29,7 @@ use crate::{ InnerTransportEvent, SubstreamError, TransportService, }, substream::Substream, - transport::{ - manager::{limits::ConnectionLimitsConfig, TransportManager}, - ConnectionLimits, KEEP_ALIVE_TIMEOUT, - }, + transport::{manager::TransportManager, ConnectionLimits, KEEP_ALIVE_TIMEOUT}, types::{RequestId, SubstreamId}, BandwidthSink, Error, PeerId, ProtocolName, }; diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index d170d27a..e49feb64 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -279,11 +279,7 @@ mod tests { limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); - let err = limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(); - // assert_eq!( - // limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), - // ConnectionLimitsError::MaxIncomingConnectionsExceeded.into(), - // ); + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_err()); assert_eq!(limits.incoming_connections.len(), 3); // Establish outgoing connection. @@ -304,12 +300,7 @@ mod tests { limits.on_connection_established(PeerId::random(), &endpoint); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 2); - - let err = limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(); - // assert_eq!( - // limits.can_accept_connection(PeerId::random(), &endpoint).unwrap_err(), - // ConnectionLimitsError::MaxOutgoingConnectionsExceeded.into(), - // ); + assert!(limits.can_accept_connection(PeerId::random(), &endpoint).is_err()); // Close connections with 1. limits.on_connection_closed(PeerId::random(), connection_id_in_1); diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 499b3111..27d105f3 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -685,9 +685,7 @@ mod tests { codec::ProtocolCodec, crypto::ed25519::Keypair, executor::DefaultExecutor, - transport::manager::{ - limits::ConnectionLimitsConfig, ProtocolContext, SupportedTransport, TransportManager, - }, + transport::manager::{ProtocolContext, SupportedTransport, TransportManager}, types::protocol::ProtocolName, BandwidthSink, PeerId, }; From de25dc634a6dda24e5c1557b82479d498579fe72 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 18:06:33 +0300 Subject: [PATCH 21/22] tests: Add comment wrt identity codec Signed-off-by: Alexandru Vasile --- src/codec/identity.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/codec/identity.rs b/src/codec/identity.rs index 7bd91ca5..5451b5b2 100644 --- a/src/codec/identity.rs +++ b/src/codec/identity.rs @@ -103,8 +103,8 @@ mod tests { let copy = bytes.clone(); let mut bytes = BytesMut::from(&bytes[..]); - let decoded = codec.decode(&mut bytes); - assert_eq!(decoded, copy); + // The smaller payload will not be decoded as the identity code needs 100 bytes. + assert!(codec.decode(&mut bytes).unwrap().is_none()); } #[test] From 1c2f4443a4004fa106cd68386aea9b93e1ba71aa Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Apr 2025 18:09:00 +0300 Subject: [PATCH 22/22] manager: Fix cargo docs Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index e49feb64..bcb48c95 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -102,14 +102,14 @@ pub trait ConnectionMiddleware: Send { /// Registers a connection as established. /// - /// This method will be called after a successful check using [`Self::check_connection`]. + /// This method will be called after a successful check using [`Self::can_accept_connection`]. /// The peer ID and endpoint are provided to identify the connection and are identical /// to the ones used in [`Self::can_accept_connection`]. fn on_connection_established(&mut self, peer: PeerId, endpoint: &Endpoint); /// Deregisters a connection when it is closed. /// - /// This method will be called after a [`Self::register_connection`] call. + /// This method will be called after a [`Self::on_connection_established`] call. /// The connection ID corresponds the endpoint provided in the /// [`Self::on_connection_established`] method. fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId);