From d50b33302e5fbea4bb342549ad3dc724a61bf449 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Wed, 29 Oct 2025 14:32:53 +0100 Subject: [PATCH 1/6] feat: transition substream tests from using SubstreamSet to FuturesStream --- src/substream/mod.rs | 202 +++++++++++++++++-------------------------- 1 file changed, 81 insertions(+), 121 deletions(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index f3d4615d..2d2a2168 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -833,8 +833,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = Pin::into_inner(self); - for (key, mut substream) in inner.substreams.iter_mut() { - match Pin::new(&mut substream).poll_next(cx) { + for (key, substream) in inner.substreams.iter_mut() { + match Pin::new(substream).poll_next(cx) { Poll::Pending => continue, Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))), Poll::Ready(None) => @@ -849,58 +849,33 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{mock::substream::MockSubstream, PeerId}; - use futures::{SinkExt, StreamExt}; + use crate::{mock::substream::MockSubstream, utils::futures_stream::FuturesStream}; + use futures::StreamExt; #[test] fn add_substream() { - let mut set = SubstreamSet::::new(); + let mut set = FuturesStream::new(); - let peer = PeerId::random(); let substream = MockSubstream::new(); - set.insert(peer, substream); - - let peer = PeerId::random(); - let substream = MockSubstream::new(); - set.insert(peer, substream); - } - - #[test] - #[should_panic] - #[cfg(debug_assertions)] - fn add_same_peer_twice() { - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let substream1 = MockSubstream::new(); - let substream2 = MockSubstream::new(); - - set.insert(peer, substream1); - set.insert(peer, substream2); - } - - #[test] - fn remove_substream() { - let mut set = SubstreamSet::::new(); - - let peer1 = PeerId::random(); - let substream1 = MockSubstream::new(); - set.insert(peer1, substream1); + let task = async |mut substream: MockSubstream| { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; - let peer2 = PeerId::random(); - let substream2 = MockSubstream::new(); - set.insert(peer2, substream2); + (request, substream) + }; + set.push(task(substream)); - assert!(set.remove(&peer1).is_some()); - assert!(set.remove(&peer2).is_some()); - assert!(set.remove(&PeerId::random()).is_none()); + let substream = MockSubstream::new(); + set.push(task(substream)); } #[tokio::test] async fn poll_data_from_substream() { - let mut set = SubstreamSet::::new(); + let mut set = FuturesStream::new(); - let peer = PeerId::random(); let mut substream = MockSubstream::new(); substream .expect_poll_next() @@ -911,24 +886,32 @@ mod tests { .times(1) .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); + let task = async |mut substream: MockSubstream| { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; + + (request, substream) + }; + + set.push(task(substream)); + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..])); + set.push(task(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..])); assert!(futures::poll!(set.next()).is_pending()); } #[tokio::test] async fn substream_closed() { - let mut set = SubstreamSet::::new(); + let mut set = FuturesStream::new(); - let peer = PeerId::random(); let mut substream = MockSubstream::new(); substream .expect_poll_next() @@ -936,65 +919,42 @@ mod tests { .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); - - match set.next().await { - Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => { - assert_eq!(peer, exited_peer); - } - _ => panic!("inavlid event received"), - } - } - - #[tokio::test] - async fn get_mut_substream() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let mut set = SubstreamSet::::new(); - let peer = PeerId::random(); - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream.expect_start_send().times(1).return_once(|_| Ok(())); - substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); + let task = async |mut substream: MockSubstream| { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); + (request, substream) + }; - let substream = set.get_mut(&peer).unwrap(); - substream.send(vec![1, 2, 3, 4].into()).await.unwrap(); + set.push(task(substream)); - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..])); + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - // try to get non-existent substream - assert!(set.get_mut(&PeerId::random()).is_none()); + set.push(task(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value, Err(SubstreamError::ConnectionClosed)); } #[tokio::test] async fn poll_data_from_two_substreams() { - let mut set = SubstreamSet::::new(); + let mut set = FuturesStream::new(); + + let task = async |mut substream: MockSubstream| { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; + + (request, substream) + }; // prepare first substream - let peer1 = PeerId::random(); let mut substream1 = MockSubstream::new(); substream1 .expect_poll_next() @@ -1005,10 +965,9 @@ mod tests { .times(1) .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); substream1.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer1, substream1); + set.push(task(substream1)); // prepare second substream - let peer2 = PeerId::random(); let mut substream2 = MockSubstream::new(); substream2 .expect_poll_next() @@ -1019,32 +978,32 @@ mod tests { .times(1) .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); substream2.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer2, substream2); + set.push(task(substream2)); - let expected: Vec> = vec![ + let expected: Vec> = vec![ vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer1, BytesMut::from(&b"world"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), ], vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer1, BytesMut::from(&b"world"[..])), - (peer2, BytesMut::from(&b"huup"[..])), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"huup"[..]), ], vec![ - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - (peer1, BytesMut::from(&b"hello"[..])), - (peer1, BytesMut::from(&b"world"[..])), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), ], vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - (peer1, BytesMut::from(&b"world"[..])), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"world"[..]), ], ]; @@ -1052,8 +1011,9 @@ mod tests { let mut values = Vec::new(); for _ in 0..4 { - let value = set.next().await.unwrap(); - values.push((value.0, value.1.unwrap())); + let (value, substream) = set.next().await.unwrap(); + values.push(value.unwrap()); + set.push(task(substream)); } let mut correct_found = false; From 7c620995821066d84c09dfe66092b40a094eb676 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Wed, 29 Oct 2025 14:33:13 +0100 Subject: [PATCH 2/6] style: doc typos --- src/protocol/request_response/mod.rs | 4 ++-- src/protocol/transport_service.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index d763fa64..6b6221f9 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -146,7 +146,7 @@ pub(crate) struct RequestResponseProtocol { /// Pending outbound responses. /// /// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`. - /// If the request is accepted by the local node, the response is sent over the channel to the + /// If the request is accepted by the local node, the response is sent over the channel to /// the future which sends it to remote peer and closes the substream. /// /// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which @@ -457,7 +457,7 @@ impl RequestResponseProtocol { Ok(()) } - /// Handle pending inbound response. + /// Handle pending inbound request. async fn on_inbound_request( &mut self, peer: PeerId, diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index b729e931..0b4186ac 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -260,7 +260,7 @@ impl Stream for KeepAliveTracker { } } -/// Provides an interfaces for [`Litep2p`](crate::Litep2p) protocols to interact +/// Provides an interface for [`Litep2p`](crate::Litep2p) protocols to interact /// with the underlying transport protocols. #[derive(Debug)] pub struct TransportService { @@ -279,7 +279,7 @@ pub struct TransportService { /// Transport handle. transport_handle: TransportManagerHandle, - /// RX channel for receiving events from tranports and connections. + /// RX channel for receiving events from transports and connections. rx: Receiver, /// Next substream ID. From 3798ac5947495ce84bdb180c4c0636bd00f63084 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Thu, 30 Oct 2025 22:01:44 +0100 Subject: [PATCH 3/6] refactor: move substream data extraction task into helper function --- src/substream/mod.rs | 69 ++++++++++++++------------------------------ 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 2d2a2168..b9ca7cdd 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -852,24 +852,27 @@ mod tests { use crate::{mock::substream::MockSubstream, utils::futures_stream::FuturesStream}; use futures::StreamExt; + async fn get_data_from_substream( + mut substream: MockSubstream, + ) -> (Result, MockSubstream) { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; + + (request, substream) + } + #[test] fn add_substream() { let mut set = FuturesStream::new(); let substream = MockSubstream::new(); - let task = async |mut substream: MockSubstream| { - let request = match substream.next().await { - Some(Ok(request)) => Ok(request), - Some(Err(error)) => Err(error), - None => Err(SubstreamError::ConnectionClosed), - }; - - (request, substream) - }; - set.push(task(substream)); + set.push(get_data_from_substream(substream)); let substream = MockSubstream::new(); - set.push(task(substream)); + set.push(get_data_from_substream(substream)); } #[tokio::test] @@ -887,21 +890,11 @@ mod tests { .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); substream.expect_poll_next().returning(|_| Poll::Pending); - let task = async |mut substream: MockSubstream| { - let request = match substream.next().await { - Some(Ok(request)) => Ok(request), - Some(Err(error)) => Err(error), - None => Err(SubstreamError::ConnectionClosed), - }; - - (request, substream) - }; - - set.push(task(substream)); + set.push(get_data_from_substream(substream)); let (value, substream) = set.next().await.unwrap(); assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - set.push(task(substream)); + set.push(get_data_from_substream(substream)); let (value, _substream) = set.next().await.unwrap(); assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..])); @@ -920,22 +913,12 @@ mod tests { substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); substream.expect_poll_next().returning(|_| Poll::Pending); - let task = async |mut substream: MockSubstream| { - let request = match substream.next().await { - Some(Ok(request)) => Ok(request), - Some(Err(error)) => Err(error), - None => Err(SubstreamError::ConnectionClosed), - }; - - (request, substream) - }; - - set.push(task(substream)); + set.push(get_data_from_substream(substream)); let (value, substream) = set.next().await.unwrap(); assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - set.push(task(substream)); + set.push(get_data_from_substream(substream)); let (value, _substream) = set.next().await.unwrap(); assert_eq!(value, Err(SubstreamError::ConnectionClosed)); } @@ -944,16 +927,6 @@ mod tests { async fn poll_data_from_two_substreams() { let mut set = FuturesStream::new(); - let task = async |mut substream: MockSubstream| { - let request = match substream.next().await { - Some(Ok(request)) => Ok(request), - Some(Err(error)) => Err(error), - None => Err(SubstreamError::ConnectionClosed), - }; - - (request, substream) - }; - // prepare first substream let mut substream1 = MockSubstream::new(); substream1 @@ -965,7 +938,7 @@ mod tests { .times(1) .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); substream1.expect_poll_next().returning(|_| Poll::Pending); - set.push(task(substream1)); + set.push(get_data_from_substream(substream1)); // prepare second substream let mut substream2 = MockSubstream::new(); @@ -978,7 +951,7 @@ mod tests { .times(1) .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); substream2.expect_poll_next().returning(|_| Poll::Pending); - set.push(task(substream2)); + set.push(get_data_from_substream(substream2)); let expected: Vec> = vec![ vec![ @@ -1013,7 +986,7 @@ mod tests { for _ in 0..4 { let (value, substream) = set.next().await.unwrap(); values.push(value.unwrap()); - set.push(task(substream)); + set.push(get_data_from_substream(substream)); } let mut correct_found = false; From e1339f81d7bc1f4577742fbad989ac849ef018d4 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Fri, 31 Oct 2025 14:03:07 +0100 Subject: [PATCH 4/6] docs(rustdoc): fix unresolved PeerState link --- src/protocol/notification/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 363adcfa..52b82157 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -68,7 +68,7 @@ const LOG_TARGET: &str = "litep2p::notification"; /// Connection state. /// /// Used to track transport level connectivity state when there is a pending validation. -/// See [`PeerState::PendingValidation.`] for more details. +/// See [`PeerState::ValidationPending.`] for more details. #[derive(Debug, PartialEq, Eq)] enum ConnectionState { /// There is a active, transport-level connection open to the peer. From be0a7d9c85500628ae17462326c8bb16b3bcc3e6 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Fri, 31 Oct 2025 17:22:56 +0100 Subject: [PATCH 5/6] refactor: relocate FuturesStream tests to futures_stream file --- src/substream/mod.rs | 163 ----------------------------------- src/utils/futures_stream.rs | 166 ++++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 163 deletions(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index b9ca7cdd..02602848 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -845,166 +845,3 @@ where Poll::Pending } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::{mock::substream::MockSubstream, utils::futures_stream::FuturesStream}; - use futures::StreamExt; - - async fn get_data_from_substream( - mut substream: MockSubstream, - ) -> (Result, MockSubstream) { - let request = match substream.next().await { - Some(Ok(request)) => Ok(request), - Some(Err(error)) => Err(error), - None => Err(SubstreamError::ConnectionClosed), - }; - - (request, substream) - } - - #[test] - fn add_substream() { - let mut set = FuturesStream::new(); - - let substream = MockSubstream::new(); - set.push(get_data_from_substream(substream)); - - let substream = MockSubstream::new(); - set.push(get_data_from_substream(substream)); - } - - #[tokio::test] - async fn poll_data_from_substream() { - let mut set = FuturesStream::new(); - - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream.expect_poll_next().returning(|_| Poll::Pending); - - set.push(get_data_from_substream(substream)); - let (value, substream) = set.next().await.unwrap(); - assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - - set.push(get_data_from_substream(substream)); - let (value, _substream) = set.next().await.unwrap(); - assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..])); - - assert!(futures::poll!(set.next()).is_pending()); - } - - #[tokio::test] - async fn substream_closed() { - let mut set = FuturesStream::new(); - - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); - substream.expect_poll_next().returning(|_| Poll::Pending); - - set.push(get_data_from_substream(substream)); - - let (value, substream) = set.next().await.unwrap(); - assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); - - set.push(get_data_from_substream(substream)); - let (value, _substream) = set.next().await.unwrap(); - assert_eq!(value, Err(SubstreamError::ConnectionClosed)); - } - - #[tokio::test] - async fn poll_data_from_two_substreams() { - let mut set = FuturesStream::new(); - - // prepare first substream - let mut substream1 = MockSubstream::new(); - substream1 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream1 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream1.expect_poll_next().returning(|_| Poll::Pending); - set.push(get_data_from_substream(substream1)); - - // prepare second substream - let mut substream2 = MockSubstream::new(); - substream2 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"siip"[..]))))); - substream2 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); - substream2.expect_poll_next().returning(|_| Poll::Pending); - set.push(get_data_from_substream(substream2)); - - let expected: Vec> = vec![ - vec![ - BytesMut::from(&b"hello"[..]), - BytesMut::from(&b"world"[..]), - BytesMut::from(&b"siip"[..]), - BytesMut::from(&b"huup"[..]), - ], - vec![ - BytesMut::from(&b"hello"[..]), - BytesMut::from(&b"siip"[..]), - BytesMut::from(&b"world"[..]), - BytesMut::from(&b"huup"[..]), - ], - vec![ - BytesMut::from(&b"siip"[..]), - BytesMut::from(&b"huup"[..]), - BytesMut::from(&b"hello"[..]), - BytesMut::from(&b"world"[..]), - ], - vec![ - BytesMut::from(&b"hello"[..]), - BytesMut::from(&b"siip"[..]), - BytesMut::from(&b"huup"[..]), - BytesMut::from(&b"world"[..]), - ], - ]; - - // poll values - let mut values = Vec::new(); - - for _ in 0..4 { - let (value, substream) = set.next().await.unwrap(); - values.push(value.unwrap()); - set.push(get_data_from_substream(substream)); - } - - let mut correct_found = false; - - for set in expected { - if values == set { - correct_found = true; - break; - } - } - - if !correct_found { - panic!("invalid set generated"); - } - - // rest of the calls return `Poll::Pending` - for _ in 0..10 { - assert!(futures::poll!(set.next()).is_pending()); - } - } -} diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 7f134794..06c7df4d 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -84,3 +84,169 @@ impl Stream for FuturesStream { Poll::Ready(Some(result)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + error::SubstreamError, mock::substream::MockSubstream, utils::futures_stream::FuturesStream, + }; + use bytes::BytesMut; + use futures::StreamExt; + + async fn get_data_from_substream( + mut substream: MockSubstream, + ) -> (Result, MockSubstream) { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; + + (request, substream) + } + + #[test] + fn add_substream() { + let mut set = FuturesStream::new(); + + let substream = MockSubstream::new(); + set.push(get_data_from_substream(substream)); + + let substream = MockSubstream::new(); + set.push(get_data_from_substream(substream)); + } + + #[tokio::test] + async fn poll_data_from_substream() { + let mut set = FuturesStream::new(); + + let mut substream = MockSubstream::new(); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); + substream.expect_poll_next().returning(|_| Poll::Pending); + + set.push(get_data_from_substream(substream)); + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); + + set.push(get_data_from_substream(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..])); + + assert!(futures::poll!(set.next()).is_pending()); + } + + #[tokio::test] + async fn substream_closed() { + let mut set = FuturesStream::new(); + + let mut substream = MockSubstream::new(); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); + substream.expect_poll_next().returning(|_| Poll::Pending); + + set.push(get_data_from_substream(substream)); + + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); + + set.push(get_data_from_substream(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value, Err(SubstreamError::ConnectionClosed)); + } + + #[tokio::test] + async fn poll_data_from_two_substreams() { + let mut set = FuturesStream::new(); + + // prepare first substream + let mut substream1 = MockSubstream::new(); + substream1 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream1 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); + substream1.expect_poll_next().returning(|_| Poll::Pending); + set.push(get_data_from_substream(substream1)); + + // prepare second substream + let mut substream2 = MockSubstream::new(); + substream2 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"siip"[..]))))); + substream2 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); + substream2.expect_poll_next().returning(|_| Poll::Pending); + set.push(get_data_from_substream(substream2)); + + let expected: Vec> = vec![ + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + ], + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"huup"[..]), + ], + vec![ + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), + ], + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"world"[..]), + ], + ]; + + // poll values + let mut values = Vec::new(); + + for _ in 0..4 { + let (value, substream) = set.next().await.unwrap(); + values.push(value.unwrap()); + set.push(get_data_from_substream(substream)); + } + + let mut correct_found = false; + + for set in expected { + if values == set { + correct_found = true; + break; + } + } + + if !correct_found { + panic!("invalid set generated"); + } + + // rest of the calls return `Poll::Pending` + for _ in 0..10 { + assert!(futures::poll!(set.next()).is_pending()); + } + } +} From bceb2709624072d6487c8119b86d3f20d7dc0756 Mon Sep 17 00:00:00 2001 From: iTranscend Date: Fri, 31 Oct 2025 20:53:22 +0100 Subject: [PATCH 6/6] fix doc comment --- src/protocol/notification/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 52b82157..328ce1ac 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -68,7 +68,7 @@ const LOG_TARGET: &str = "litep2p::notification"; /// Connection state. /// /// Used to track transport level connectivity state when there is a pending validation. -/// See [`PeerState::ValidationPending.`] for more details. +/// See [`PeerState::ValidationPending`] for more details. #[derive(Debug, PartialEq, Eq)] enum ConnectionState { /// There is a active, transport-level connection open to the peer.