diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index c724dff6..6fbe2a09 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -209,52 +209,6 @@ impl TcpConnection { } } - /// Open connection to remote peer at `address`. - // TODO: https://github.com/paritytech/litep2p/issues/347 this function can be removed - pub(super) async fn open_connection( - connection_id: ConnectionId, - keypair: Keypair, - stream: TcpStream, - address: AddressType, - peer: Option, - yamux_config: crate::yamux::Config, - max_read_ahead_factor: usize, - max_write_buffer_size: usize, - connection_open_timeout: Duration, - substream_open_timeout: Duration, - ) -> Result { - tracing::debug!( - target: LOG_TARGET, - ?address, - ?peer, - "open connection to remote peer", - ); - - match tokio::time::timeout(connection_open_timeout, async move { - Self::negotiate_connection( - stream, - peer, - connection_id, - keypair, - Role::Dialer, - address, - yamux_config, - max_read_ahead_factor, - max_write_buffer_size, - substream_open_timeout, - ) - .await - }) - .await - { - Err(_) => { - tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation"); - Err(NegotiationError::Timeout) - } - Ok(result) => result, - } - } - /// Open substream for `protocol`. pub(super) async fn open_substream( mut control: crate::yamux::Control, @@ -317,26 +271,20 @@ impl TcpConnection { ) -> Result { tracing::debug!(target: LOG_TARGET, ?address, "accept connection"); - match tokio::time::timeout(connection_open_timeout, async move { - Self::negotiate_connection( - stream, - None, - connection_id, - keypair, - Role::Listener, - AddressType::Socket(address), - yamux_config, - max_read_ahead_factor, - max_write_buffer_size, - substream_open_timeout, - ) - .await - }) + Self::negotiate_connection( + stream, + None, + connection_id, + keypair, + Role::Listener, + AddressType::Socket(address), + yamux_config, + max_read_ahead_factor, + max_write_buffer_size, + connection_open_timeout, + substream_open_timeout, + ) .await - { - Err(_) => Err(NegotiationError::Timeout), - Ok(result) => result, - } } /// Accept substream. @@ -410,89 +358,107 @@ impl TcpConnection { yamux_config: crate::yamux::Config, max_read_ahead_factor: usize, max_write_buffer_size: usize, + connection_open_timeout: Duration, substream_open_timeout: Duration, ) -> Result { tracing::trace!( target: LOG_TARGET, + ?address, + ?dialed_peer, ?role, - "negotiate connection", + "negotiate connection to remote peer", ); - let stream = TokioAsyncReadCompatExt::compat(stream).into_inner(); - let stream = TokioAsyncWriteCompatExt::compat_write(stream); + let negotiate_task = async move { + let stream = TokioAsyncReadCompatExt::compat(stream).into_inner(); + let stream = TokioAsyncWriteCompatExt::compat_write(stream); - // negotiate `noise` - let (stream, _) = - Self::negotiate_protocol(stream, &role, vec!["/noise"], substream_open_timeout).await?; + // negotiate `noise` + let (stream, _) = + Self::negotiate_protocol(stream, &role, vec!["/noise"], substream_open_timeout) + .await?; - tracing::trace!( - target: LOG_TARGET, - "`multistream-select` and `noise` negotiated", - ); + tracing::trace!( + target: LOG_TARGET, + "`multistream-select` and `noise` negotiated", + ); - // perform noise handshake - let (stream, peer) = noise::handshake( - stream.inner(), - &keypair, - role, - max_read_ahead_factor, - max_write_buffer_size, - substream_open_timeout, - noise::HandshakeTransport::Tcp, - ) - .await?; + // perform noise handshake + let (stream, peer) = noise::handshake( + stream.inner(), + &keypair, + role, + max_read_ahead_factor, + max_write_buffer_size, + substream_open_timeout, + noise::HandshakeTransport::Tcp, + ) + .await?; - if let Some(dialed_peer) = dialed_peer { - if dialed_peer != peer { - tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch"); - return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); + if let Some(dialed_peer) = dialed_peer { + if dialed_peer != peer { + tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch"); + return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer)); + } } - } - tracing::trace!(target: LOG_TARGET, "noise handshake done"); - let stream: NoiseSocket> = stream; - - // negotiate `yamux` - let (stream, _) = - Self::negotiate_protocol(stream, &role, vec!["/yamux/1.0.0"], substream_open_timeout) - .await?; - tracing::trace!(target: LOG_TARGET, "`yamux` negotiated"); + tracing::trace!(target: LOG_TARGET, "noise handshake done"); + let stream: NoiseSocket> = stream; - let connection = crate::yamux::Connection::new(stream.inner(), yamux_config, role.into()); - let (control, connection) = crate::yamux::Control::new(connection); + // negotiate `yamux` + let (stream, _) = Self::negotiate_protocol( + stream, + &role, + vec!["/yamux/1.0.0"], + substream_open_timeout, + ) + .await?; + tracing::trace!(target: LOG_TARGET, "`yamux` negotiated"); + + let connection = + crate::yamux::Connection::new(stream.inner(), yamux_config, role.into()); + let (control, connection) = crate::yamux::Control::new(connection); + + let address = match address { + AddressType::Socket(address) => Multiaddr::empty() + .with(Protocol::from(address.ip())) + .with(Protocol::Tcp(address.port())), + AddressType::Dns { + address, + port, + dns_type, + } => match dns_type { + DnsType::Dns => Multiaddr::empty() + .with(Protocol::Dns(Cow::Owned(address))) + .with(Protocol::Tcp(port)), + DnsType::Dns4 => Multiaddr::empty() + .with(Protocol::Dns4(Cow::Owned(address))) + .with(Protocol::Tcp(port)), + DnsType::Dns6 => Multiaddr::empty() + .with(Protocol::Dns6(Cow::Owned(address))) + .with(Protocol::Tcp(port)), + }, + }; + let endpoint = match role { + Role::Dialer => Endpoint::dialer(address, connection_id), + Role::Listener => Endpoint::listener(address, connection_id), + }; - let address = match address { - AddressType::Socket(address) => Multiaddr::empty() - .with(Protocol::from(address.ip())) - .with(Protocol::Tcp(address.port())), - AddressType::Dns { - address, - port, - dns_type, - } => match dns_type { - DnsType::Dns => Multiaddr::empty() - .with(Protocol::Dns(Cow::Owned(address))) - .with(Protocol::Tcp(port)), - DnsType::Dns4 => Multiaddr::empty() - .with(Protocol::Dns4(Cow::Owned(address))) - .with(Protocol::Tcp(port)), - DnsType::Dns6 => Multiaddr::empty() - .with(Protocol::Dns6(Cow::Owned(address))) - .with(Protocol::Tcp(port)), - }, + Ok(NegotiatedConnection { + peer, + control, + connection, + endpoint, + substream_open_timeout, + }) }; - let endpoint = match role { - Role::Dialer => Endpoint::dialer(address, connection_id), - Role::Listener => Endpoint::listener(address, connection_id), + + let Ok(result) = tokio::time::timeout(connection_open_timeout, negotiate_task).await else { + tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation"); + return Err(NegotiationError::Timeout); }; - Ok(NegotiatedConnection { - peer, - control, - connection, - endpoint, - substream_open_timeout, - }) + result } /// Handles the yamux substream. @@ -787,12 +753,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, @@ -889,12 +856,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, @@ -1038,12 +1006,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, @@ -1091,12 +1060,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, @@ -1271,12 +1241,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, @@ -1406,12 +1377,13 @@ mod tests { .await .unwrap(); - match TcpConnection::open_connection( + match TcpConnection::negotiate_connection( + stream, + None, ConnectionId::from(0usize), Keypair::generate(), - stream, + Role::Dialer, AddressType::Socket(address), - None, Default::default(), 5, 2, diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index a97fa417..e7746010 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -341,12 +341,13 @@ impl Transport for TcpTransport { .await .map_err(|error| (connection_id, error))?; - TcpConnection::open_connection( + TcpConnection::negotiate_connection( + stream, + peer, connection_id, keypair, - stream, + Role::Dialer, socket_address, - peer, yamux_config, max_read_ahead_factor, max_write_buffer_size, @@ -530,6 +531,7 @@ impl Transport for TcpTransport { yamux_config, max_read_ahead_factor, max_write_buffer_size, + connection_open_timeout, substream_open_timeout, ) .await