From b382476aa06ffc95bb281b6dc6539db28b1c10b2 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 28 Jul 2025 16:39:14 +0800 Subject: [PATCH 1/9] mdns expiry --- iroh/examples/locally-discovered-nodes.rs | 5 +- iroh/src/discovery.rs | 91 +++++++++++++++-------- iroh/src/discovery/dns.rs | 11 ++- iroh/src/discovery/mdns.rs | 57 ++++++++------ iroh/src/discovery/pkarr.rs | 9 ++- iroh/src/discovery/pkarr/dht.rs | 12 ++- iroh/src/discovery/static_provider.rs | 6 +- iroh/src/endpoint.rs | 4 +- iroh/src/magicsock.rs | 23 +++--- iroh/tests/integration.rs | 4 +- 10 files changed, 141 insertions(+), 81 deletions(-) diff --git a/iroh/examples/locally-discovered-nodes.rs b/iroh/examples/locally-discovered-nodes.rs index 8eae4679d13..5257f6b1283 100644 --- a/iroh/examples/locally-discovered-nodes.rs +++ b/iroh/examples/locally-discovered-nodes.rs @@ -5,7 +5,7 @@ //! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well. use std::time::Duration; -use iroh::{Endpoint, NodeId, node_info::UserData}; +use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData}; use n0_future::StreamExt; use n0_snafu::Result; use tokio::task::JoinSet; @@ -32,7 +32,7 @@ async fn main() -> Result<()> { tracing::error!("{e}"); return; } - Ok(item) => { + Ok(DiscoveryEvent::Discovered(item)) => { // if there is no user data, or the user data // does not indicate that the discovered node // is a part of the example, ignore it @@ -53,6 +53,7 @@ async fn main() -> Result<()> { println!("Found node {}!", item.node_id().fmt_short()); } } + Ok(DiscoveryEvent::Expired(_)) => {} }; } }); diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 8da9a4e78a6..ff815d066f2 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -326,7 +326,7 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { fn resolve( &self, _node_id: NodeId, - ) -> Option>> { + ) -> Option>> { None } @@ -352,13 +352,23 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { /// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system /// and add the discovered addresses to the internal address book as they arrive /// on this stream. - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { None } } impl Discovery for Arc {} +/// An event emitted from [`Discovery`] services. +#[derive(Debug, Clone)] +pub enum DiscoveryEvent { + /// A peer was discovered or it's information was updated. + Discovered(DiscoveryItem), + /// A peer was expired due to being inactive, unreachable, or otherwise + /// unavailable. + Expired(NodeId), +} + /// Node discovery results from [`Discovery`] services. /// /// This is the item in the streams returned from [`Discovery::resolve`] and @@ -488,7 +498,10 @@ impl Discovery for ConcurrentDiscovery { } } - fn resolve(&self, node_id: NodeId) -> Option>> { + fn resolve( + &self, + node_id: NodeId, + ) -> Option>> { let streams = self .services .iter() @@ -498,7 +511,7 @@ impl Discovery for ConcurrentDiscovery { Some(Box::pin(streams)) } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { let mut streams = vec![]; for service in self.services.iter() { if let Some(stream) = service.subscribe() { @@ -592,7 +605,7 @@ impl DiscoveryTask { fn create_stream( ep: &Endpoint, node_id: NodeId, - ) -> Result>, DiscoveryError> { + ) -> Result>, DiscoveryError> { let discovery = ep.discovery().ok_or(NoServiceConfiguredSnafu.build())?; let stream = discovery .resolve(node_id) @@ -641,14 +654,17 @@ impl DiscoveryTask { loop { match stream.next().await { Some(Ok(r)) => { - let provenance = r.provenance; - let node_addr = r.to_node_addr(); - if node_addr.is_empty() { - debug!(%provenance, "empty address found"); - continue; + if let DiscoveryEvent::Discovered(r) = &r { + let provenance = r.provenance; + let node_addr = r.to_node_addr(); + if node_addr.is_empty() { + debug!(%provenance, "empty address found"); + continue; + } + debug!(%provenance, addr = ?node_addr, "new address found"); + ep.add_node_addr_with_source(node_addr, provenance).ok(); } - debug!(%provenance, addr = ?node_addr, "new address found"); - ep.add_node_addr_with_source(node_addr, provenance).ok(); + if let Some(tx) = on_first_tx.take() { tx.send(Ok(())).ok(); } @@ -685,7 +701,7 @@ pub struct Lagged { #[derive(Clone, Debug)] pub(super) struct DiscoverySubscribers { - inner: tokio::sync::broadcast::Sender, + inner: tokio::sync::broadcast::Sender, } impl DiscoverySubscribers { @@ -699,13 +715,13 @@ impl DiscoverySubscribers { } } - pub(crate) fn subscribe(&self) -> impl Stream> + use<> { + pub(crate) fn subscribe(&self) -> impl Stream> + use<> { use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; let recv = self.inner.subscribe(); BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n }) } - pub(crate) fn send(&self, item: DiscoveryItem) { + pub(crate) fn send(&self, item: DiscoveryEvent) { // `broadcast::Sender::send` returns an error if the channel has no subscribers, // which we don't care about. self.inner.send(item).ok(); @@ -737,7 +753,7 @@ mod tests { #[derive(Debug, Clone)] struct TestDiscoveryShared { nodes: Arc>, - watchers: tokio::sync::broadcast::Sender, + watchers: tokio::sync::broadcast::Sender, } impl Default for TestDiscoveryShared { @@ -770,7 +786,7 @@ mod tests { } } - pub fn send_passive(&self, item: DiscoveryItem) { + pub fn send_passive(&self, item: DiscoveryEvent) { self.watchers.send(item).ok(); } } @@ -800,7 +816,7 @@ mod tests { fn resolve( &self, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let addr_info = if self.resolve_wrong { let ts = system_time_now() - 100_000; let port: u16 = rand::thread_rng().gen_range(10_000..20_000); @@ -822,7 +838,7 @@ mod tests { let fut = async move { time::sleep(delay).await; tracing::debug!("resolve: {} = {item:?}", node_id.fmt_short()); - Ok(item) + Ok(DiscoveryEvent::Discovered(item)) }; n0_future::stream::once_future(fut).boxed() } @@ -831,7 +847,7 @@ mod tests { Some(stream) } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { let recv = self.shared.watchers.subscribe(); let stream = tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok()); @@ -848,7 +864,7 @@ mod tests { fn resolve( &self, _node_id: NodeId, - ) -> Option>> { + ) -> Option>> { Some(n0_future::stream::empty().boxed()) } } @@ -1019,11 +1035,16 @@ mod tests { ep2.node_addr().initialized().await?; let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?; - let item = tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged"); + let DiscoveryEvent::Discovered(item) = + tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .expect("timeout") + .expect("stream closed") + .expect("stream lagged") + else { + panic!("Returned unexpected discovery event!"); + }; + assert_eq!(item.node_id(), ep2.node_id()); assert_eq!(item.provenance(), "test-disco"); @@ -1031,13 +1052,17 @@ mod tests { let passive_node_id = SecretKey::generate(rand::thread_rng()).public(); let node_info = NodeInfo::new(passive_node_id); let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None); - disco_shared.send_passive(passive_item.clone()); - - let item = tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged"); + disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone())); + + let DiscoveryEvent::Discovered(item) = + tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .expect("timeout") + .expect("stream closed") + .expect("stream lagged") + else { + panic!("Returned unexpected discovery event!"); + }; assert_eq!(item.node_id(), passive_node_id); assert_eq!(item.provenance(), "test-disco-passive"); diff --git a/iroh/src/discovery/dns.rs b/iroh/src/discovery/dns.rs index 24f629b85bc..7ab9db538db 100644 --- a/iroh/src/discovery/dns.rs +++ b/iroh/src/discovery/dns.rs @@ -5,7 +5,7 @@ use iroh_relay::dns::DnsResolver; pub use iroh_relay::dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}; use n0_future::boxed::BoxStream; -use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; +use super::{DiscoveryContext, DiscoveryError, DiscoveryEvent, IntoDiscovery, IntoDiscoveryError}; use crate::{ discovery::{Discovery, DiscoveryItem}, endpoint::force_staging_infra, @@ -105,7 +105,10 @@ impl IntoDiscovery for DnsDiscoveryBuilder { } impl Discovery for DnsDiscovery { - fn resolve(&self, node_id: NodeId) -> Option>> { + fn resolve( + &self, + node_id: NodeId, + ) -> Option>> { let resolver = self.dns_resolver.clone(); let origin_domain = self.origin_domain.clone(); let fut = async move { @@ -113,7 +116,9 @@ impl Discovery for DnsDiscovery { .lookup_node_by_id_staggered(&node_id, &origin_domain, DNS_STAGGERING_MS) .await .map_err(|e| DiscoveryError::from_err("dns", e))?; - Ok(DiscoveryItem::new(node_info, "dns", None)) + Ok(DiscoveryEvent::Discovered(DiscoveryItem::new( + node_info, "dns", None, + ))) }; let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index 96c7607856f..0e0aa2a4171 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -48,7 +48,7 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{Instrument, debug, error, info_span, trace, warn}; use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; -use crate::discovery::{Discovery, DiscoveryItem, NodeData, NodeInfo}; +use crate::discovery::{Discovery, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo}; /// The n0 local swarm node discovery name const N0_LOCAL_SWARM: &str = "iroh.local.swarm"; @@ -80,14 +80,14 @@ pub struct MdnsDiscovery { #[derive(Debug)] enum Message { Discovery(String, Peer), - Resolve(NodeId, mpsc::Sender>), + Resolve(NodeId, mpsc::Sender>), Timeout(NodeId, usize), - Subscribe(mpsc::Sender), + Subscribe(mpsc::Sender), } /// Manages the list of subscribers that are subscribed to this discovery service. #[derive(Debug)] -struct Subscribers(Vec>); +struct Subscribers(Vec>); impl Subscribers { fn new() -> Self { @@ -95,14 +95,14 @@ impl Subscribers { } /// Add the subscriber to the list of subscribers - fn push(&mut self, subscriber: mpsc::Sender) { + fn push(&mut self, subscriber: mpsc::Sender) { self.0.push(subscriber); } /// Sends the `node_id` and `item` to each subscriber. /// /// Cleans up any subscribers that have been dropped. - fn send(&mut self, item: DiscoveryItem) { + fn send(&mut self, item: DiscoveryEvent) { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped @@ -169,7 +169,7 @@ impl MdnsDiscovery { let mut last_id = 0; let mut senders: HashMap< PublicKey, - HashMap>>, + HashMap>>, > = HashMap::default(); let mut timeouts = JoinSet::new(); loop { @@ -231,6 +231,7 @@ impl MdnsDiscovery { "removing node from MdnsDiscovery address book" ); node_addrs.remove(&discovered_node_id); + subscribers.send(DiscoveryEvent::Expired(discovered_node_id)); continue; } @@ -254,7 +255,10 @@ impl MdnsDiscovery { trace!(?item, senders = senders.len(), "sending DiscoveryItem"); resolved = true; for sender in senders.values() { - sender.send(Ok(item.clone())).await.ok(); + sender + .send(Ok(DiscoveryEvent::Discovered(item.clone()))) + .await + .ok(); } } entry.or_insert(peer_info); @@ -263,7 +267,7 @@ impl MdnsDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(item); + subscribers.send(DiscoveryEvent::Discovered(item)); } } Message::Resolve(node_id, sender) => { @@ -273,7 +277,7 @@ impl MdnsDiscovery { if let Some(peer_info) = node_addrs.get(&node_id) { let item = peer_to_discovery_item(peer_info, &node_id); debug!(?item, "sending DiscoveryItem"); - sender.send(Ok(item)).await.ok(); + sender.send(Ok(DiscoveryEvent::Discovered(item))).await.ok(); } if let Some(senders_for_node_id) = senders.get_mut(&node_id) { senders_for_node_id.insert(id, sender); @@ -390,7 +394,10 @@ fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem { } impl Discovery for MdnsDiscovery { - fn resolve(&self, node_id: NodeId) -> Option>> { + fn resolve( + &self, + node_id: NodeId, + ) -> Option>> { use futures_util::FutureExt; let (send, recv) = mpsc::channel(20); @@ -409,7 +416,7 @@ impl Discovery for MdnsDiscovery { self.local_addrs.set(Some(data.clone())).ok(); } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { use futures_util::FutureExt; let (sender, recv) = mpsc::channel(20); @@ -456,14 +463,22 @@ mod tests { tracing::debug!(?node_id_b, "Discovering node id b"); // publish discovery_b's address discovery_b.publish(&node_data); - let s1_res = tokio::time::timeout(Duration::from_secs(5), s1.next()) - .await - .context("timeout")? - .unwrap()?; - let s2_res = tokio::time::timeout(Duration::from_secs(5), s2.next()) - .await - .context("timeout")? - .unwrap()?; + let DiscoveryEvent::Discovered(s1_res) = + tokio::time::timeout(Duration::from_secs(5), s1.next()) + .await + .context("timeout")? + .unwrap()? + else { + panic!("Received unexpected discovery event"); + }; + let DiscoveryEvent::Discovered(s2_res) = + tokio::time::timeout(Duration::from_secs(5), s2.next()) + .await + .context("timeout")? + .unwrap()? + else { + panic!("Received unexpected discovery event"); + }; assert_eq!(s1_res.node_info().data, node_data); assert_eq!(s2_res.node_info().data, node_data); @@ -494,7 +509,7 @@ mod tests { let test = async move { let mut got_ids = BTreeSet::new(); while got_ids.len() != num_nodes { - if let Some(item) = events.next().await { + if let Some(DiscoveryEvent::Discovered(item)) = events.next().await { if node_ids.contains(&(item.node_id(), item.user_data())) { got_ids.insert((item.node_id(), item.user_data())); } diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index 893b7cbd1a0..6daaf9f9997 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -62,7 +62,7 @@ use snafu::{ResultExt, Snafu}; use tracing::{Instrument, debug, error_span, warn}; use url::Url; -use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; +use super::{DiscoveryContext, DiscoveryError, DiscoveryEvent, IntoDiscovery, IntoDiscoveryError}; #[cfg(not(wasm_browser))] use crate::dns::DnsResolver; use crate::{ @@ -500,14 +500,17 @@ impl PkarrResolver { } impl Discovery for PkarrResolver { - fn resolve(&self, node_id: NodeId) -> Option>> { + fn resolve( + &self, + node_id: NodeId, + ) -> Option>> { let pkarr_client = self.pkarr_client.clone(); let fut = async move { let signed_packet = pkarr_client.resolve(node_id).await?; let info = NodeInfo::from_pkarr_signed_packet(&signed_packet) .map_err(|err| DiscoveryError::from_err("pkarr", err))?; let item = DiscoveryItem::new(info, "pkarr", None); - Ok(item) + Ok(DiscoveryEvent::Discovered(item)) }; let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 2114ad0e974..3ca6e92a15a 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -19,7 +19,7 @@ use url::Url; use crate::{ discovery::{ - Discovery, DiscoveryContext, DiscoveryError, DiscoveryItem, IntoDiscovery, + Discovery, DiscoveryContext, DiscoveryError, DiscoveryEvent, DiscoveryItem, IntoDiscovery, IntoDiscoveryError, NodeData, pkarr::{DEFAULT_PKARR_TTL, N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING}, }, @@ -314,7 +314,10 @@ impl Discovery for DhtDiscovery { *task = Some(AbortOnDropHandle::new(curr)); } - fn resolve(&self, node_id: NodeId) -> Option>> { + fn resolve( + &self, + node_id: NodeId, + ) -> Option>> { let pkarr_public_key = pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32()); @@ -323,6 +326,7 @@ impl Discovery for DhtDiscovery { discovery.resolve_pkarr(pkarr_public_key).await }) .filter_map(|x| x) + .map(|x| x.map(DiscoveryEvent::Discovered)) .boxed(); Some(stream) } @@ -371,6 +375,10 @@ mod tests { .collect::>() .await; for item in items.into_iter().flatten() { + let DiscoveryEvent::Discovered(item) = item else { + continue; + }; + if let Some(url) = item.relay_url() { found_relay_urls.insert(url.clone()); } diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index 773cca5c0a8..c140737eee1 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -22,7 +22,7 @@ use n0_future::{ time::SystemTime, }; -use super::{Discovery, DiscoveryError, DiscoveryItem, NodeData, NodeInfo}; +use super::{Discovery, DiscoveryError, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo}; /// A static node discovery to manually add node addressing information. /// @@ -184,7 +184,7 @@ impl Discovery for StaticProvider { fn resolve( &self, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { @@ -199,7 +199,7 @@ impl Discovery for StaticProvider { Self::PROVENANCE, Some(last_updated), ); - Some(stream::iter(Some(Ok(item))).boxed()) + Some(stream::iter(Some(Ok(DiscoveryEvent::Discovered(item)))).boxed()) } None => None, } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index f78115fdeab..2a39f403786 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::discovery::pkarr::PkarrResolver; use crate::{discovery::dns::DnsDiscovery, dns::DnsResolver}; use crate::{ discovery::{ - ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, DiscoveryItem, + ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, DiscoveryEvent, DiscoverySubscribers, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, IntoDiscoveryError, Lagged, UserData, pkarr::PkarrPublisher, }, @@ -1117,7 +1117,7 @@ impl Endpoint { /// /// [`MdnsDiscovery`]: crate::discovery::mdns::MdnsDiscovery /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - pub fn discovery_stream(&self) -> impl Stream> + use<> { + pub fn discovery_stream(&self) -> impl Stream> + use<> { self.msock.discovery_subscribers().subscribe() } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 2bf4ed47170..e40af41c22a 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -71,7 +71,7 @@ use crate::net_report::{IpMappedAddr, QuicConfig}; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, - discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData}, + discovery::{Discovery, DiscoveryEvent, DiscoverySubscribers, NodeData, UserData}, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, metrics::EndpointMetrics, net_report::{self, IfStateDetails, IpMappedAddresses, Report}, @@ -1741,7 +1741,7 @@ impl Actor { .port_mapper .watch_external_address(); - let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); + let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); if let Some(d) = self.msock.discovery() { if let Some(events) = d.subscribe() { discovery_events = events; @@ -1880,16 +1880,19 @@ impl Actor { // forever like we do with the other branches that yield `Option`s Some(discovery_item) = discovery_events.next() => { trace!("tick: discovery event, address discovered: {discovery_item:?}"); - let provenance = discovery_item.provenance(); - let node_addr = discovery_item.to_node_addr(); - if let Err(e) = self.msock.add_node_addr( - node_addr, - Source::Discovery { - name: provenance.to_string() - }) { + if let DiscoveryEvent::Discovered(discovery_item) = &discovery_item { + let provenance = discovery_item.provenance(); let node_addr = discovery_item.to_node_addr(); - warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); + if let Err(e) = self.msock.add_node_addr( + node_addr, + Source::Discovery { + name: provenance.to_string() + }) { + let node_addr = discovery_item.to_node_addr(); + warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); + } } + // Send the discovery item to the subscribers of the discovery broadcast stream. self.msock.discovery_subscribers.send(discovery_item); } diff --git a/iroh/tests/integration.rs b/iroh/tests/integration.rs index e38bfb4bea2..b987f28e96a 100644 --- a/iroh/tests/integration.rs +++ b/iroh/tests/integration.rs @@ -11,7 +11,7 @@ //! we won't hit these with only this integration test. use iroh::{ Endpoint, RelayMode, - discovery::{Discovery, pkarr::PkarrResolver}, + discovery::{Discovery, DiscoveryEvent, pkarr::PkarrResolver}, }; use n0_future::{ StreamExt, task, @@ -87,7 +87,7 @@ async fn simple_node_id_based_connection_transfer() -> Result { let Some(mut stream) = resolver.resolve(node_id) else { continue; }; - let Ok(Some(item)) = stream.try_next().await else { + let Ok(Some(DiscoveryEvent::Discovered(item))) = stream.try_next().await else { continue; }; if item.relay_url().is_some() { From c1ce6f64db147390c822d6c95bbfd29d9582ce13 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 28 Jul 2025 17:05:53 +0800 Subject: [PATCH 2/9] wip tests --- iroh/src/discovery.rs | 4 ++-- iroh/src/discovery/mdns.rs | 39 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index ff815d066f2..2b278da455c 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -360,7 +360,7 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { impl Discovery for Arc {} /// An event emitted from [`Discovery`] services. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum DiscoveryEvent { /// A peer was discovered or it's information was updated. Discovered(DiscoveryItem), @@ -377,7 +377,7 @@ pub enum DiscoveryEvent { /// /// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`] /// directly from [`DiscoveryItem`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct DiscoveryItem { /// The node info for the node, as discovered by the the discovery service. node_info: NodeInfo, diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index 0e0aa2a4171..aa696b2bb61 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -485,6 +485,45 @@ mod tests { Ok(()) } + #[tokio::test] + #[traced_test] + async fn mdns_expiry() -> Result { + let (_, discovery_a) = make_discoverer()?; + let (node_id_b, discovery_b) = make_discoverer()?; + + // make addr info for discoverer b + let user_data: UserData = "foobar".parse()?; + let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) + .with_user_data(Some(user_data.clone())); + println!("info {node_data:?}"); + + // resolve twice to ensure we can create separate streams for the same node_id + let mut s1 = discovery_a.resolve(node_id_b).unwrap(); + + tracing::debug!(?node_id_b, "Discovering node id b"); + // publish discovery_b's address + discovery_b.publish(&node_data); + let DiscoveryEvent::Discovered(s1_res) = + tokio::time::timeout(Duration::from_secs(5), s1.next()) + .await + .context("timeout")? + .unwrap()? + else { + panic!("Received unexpected discovery event"); + }; + assert_eq!(s1_res.node_info().data, node_data); + + drop(discovery_b); + + let result = tokio::time::timeout(Duration::from_secs(30), s1.next()) + .await + .context("timeout")? + .unwrap()?; + assert_eq!(result, DiscoveryEvent::Expired(node_id_b)); + + Ok(()) + } + #[tokio::test] #[traced_test] async fn mdns_subscribe() -> Result { From cae97e3bb057eb572ed225952a8ff13c938c978a Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Thu, 31 Jul 2025 10:29:14 +0800 Subject: [PATCH 3/9] fix unit test --- iroh/src/discovery/mdns.rs | 82 +++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 28 deletions(-) diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index aa696b2bb61..93aef27ef0c 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -39,7 +39,7 @@ use std::{ use iroh_base::{NodeId, PublicKey}; use n0_future::{ boxed::BoxStream, - task::{self, AbortOnDropHandle, JoinSet}, + task::{self, JoinSet}, time::{self, Duration}, }; use n0_watcher::{Watchable, Watcher as _}; @@ -70,9 +70,8 @@ const DISCOVERY_DURATION: Duration = Duration::from_secs(10); /// Discovery using `swarm-discovery`, a variation on mdns #[derive(Debug)] pub struct MdnsDiscovery { - #[allow(dead_code)] - handle: AbortOnDropHandle<()>, sender: mpsc::Sender, + shutdown: mpsc::Sender<()>, /// When `local_addrs` changes, we re-publish our info. local_addrs: Watchable>, } @@ -156,6 +155,7 @@ impl MdnsDiscovery { pub fn new(node_id: NodeId) -> Result { debug!("Creating new MdnsDiscovery service"); let (send, mut recv) = mpsc::channel(64); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); let task_sender = send.clone(); let rt = tokio::runtime::Handle::current(); let discovery = @@ -193,12 +193,21 @@ impl MdnsDiscovery { } continue; } + _ = shutdown_rx.recv() => { + error!("MdnsDiscovery channel closed"); + error!("closing MdnsDiscovery"); + timeouts.abort_all(); + + return; + } }; let msg = match msg { None => { error!("MdnsDiscovery channel closed"); error!("closing MdnsDiscovery"); timeouts.abort_all(); + discovery.remove_all(); + return; } Some(msg) => msg, @@ -312,10 +321,10 @@ impl MdnsDiscovery { } } }; - let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); + task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { - handle: AbortOnDropHandle::new(handle), sender: send, + shutdown: shutdown_tx, local_addrs, }) } @@ -368,6 +377,12 @@ impl MdnsDiscovery { } } +impl Drop for MdnsDiscovery { + fn drop(&mut self) { + self.shutdown.try_send(()).ok(); + } +} + fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem { let direct_addresses: BTreeSet = peer .addrs() @@ -454,7 +469,6 @@ mod tests { let user_data: UserData = "foobar".parse()?; let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) .with_user_data(Some(user_data.clone())); - println!("info {node_data:?}"); // resolve twice to ensure we can create separate streams for the same node_id let mut s1 = discovery_a.resolve(node_id_b).unwrap(); @@ -487,39 +501,51 @@ mod tests { #[tokio::test] #[traced_test] - async fn mdns_expiry() -> Result { + async fn mdns_publish_expire() -> Result { let (_, discovery_a) = make_discoverer()?; let (node_id_b, discovery_b) = make_discoverer()?; - // make addr info for discoverer b - let user_data: UserData = "foobar".parse()?; + // publish discovery_b's address let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) - .with_user_data(Some(user_data.clone())); - println!("info {node_data:?}"); - - // resolve twice to ensure we can create separate streams for the same node_id - let mut s1 = discovery_a.resolve(node_id_b).unwrap(); + .with_user_data(Some("".parse()?)); + discovery_b.publish(&node_data); + let mut s1 = discovery_a.subscribe().unwrap(); tracing::debug!(?node_id_b, "Discovering node id b"); - // publish discovery_b's address - discovery_b.publish(&node_data); - let DiscoveryEvent::Discovered(s1_res) = - tokio::time::timeout(Duration::from_secs(5), s1.next()) + + // Wait for the specific node to be discovered + loop { + let event = tokio::time::timeout(Duration::from_secs(5), s1.next()) .await .context("timeout")? - .unwrap()? - else { - panic!("Received unexpected discovery event"); - }; - assert_eq!(s1_res.node_info().data, node_data); + .expect("Stream should not be closed"); + match event { + DiscoveryEvent::Discovered(item) if item.node_info().node_id == node_id_b => { + break; + } + _ => continue, // Ignore other discovery events + } + } + + // Shutdown node B drop(discovery_b); + tokio::time::sleep(Duration::from_secs(5)).await; - let result = tokio::time::timeout(Duration::from_secs(30), s1.next()) - .await - .context("timeout")? - .unwrap()?; - assert_eq!(result, DiscoveryEvent::Expired(node_id_b)); + // Wait for the expiration event for the specific node + loop { + let event = tokio::time::timeout(Duration::from_secs(10), s1.next()) + .await + .context("timeout waiting for expiration event")? + .expect("Stream should not be closed"); + + match event { + DiscoveryEvent::Expired(expired_node_id) if expired_node_id == node_id_b => { + break; + } + _ => continue, // Ignore other events + } + } Ok(()) } From 4f297df237e03581ec9601ae3be6240cd9e39710 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 4 Aug 2025 00:02:43 +0800 Subject: [PATCH 4/9] fix comment --- iroh/src/endpoint.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 290b4133f16..38c1a890576 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -1096,7 +1096,7 @@ impl Endpoint { /// Returns a stream of all remote nodes discovered through the endpoint's discovery services. /// /// Whenever a node is discovered via the endpoint's discovery service, the corresponding - /// [`DiscoveryItem`] is yielded from this stream. This includes nodes discovered actively + /// [`DiscoveryEvent`] is yielded from this stream. This includes nodes discovered actively /// through [`Discovery::resolve`], which is invoked automatically when calling /// [`Endpoint::connect`] for a [`NodeId`] unknown to the endpoint. It also includes /// nodes that the endpoint discovers passively from discovery services that implement From 085f3152a5bd1a69f9f47a6a963a59b5f291deff Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 19 Aug 2025 01:28:21 +0800 Subject: [PATCH 5/9] add expiry to static provider --- iroh/src/discovery/static_provider.rs | 30 +++++++++++++++------------ 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index c140737eee1..2c90e853701 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -67,7 +67,7 @@ use super::{Discovery, DiscoveryError, DiscoveryEvent, DiscoveryItem, NodeData, #[derive(Debug, Default, Clone)] #[repr(transparent)] pub struct StaticProvider { - nodes: Arc>>, + nodes: Arc>>>, } #[derive(Debug)] @@ -132,7 +132,7 @@ impl StaticProvider { let last_updated = SystemTime::now(); let NodeInfo { node_id, data } = node_info.into(); let mut guard = self.nodes.write().expect("poisoned"); - let previous = guard.insert(node_id, StoredNodeInfo { data, last_updated }); + let previous = guard.insert(node_id, Some(StoredNodeInfo { data, last_updated })); previous.map(|x| x.data) } @@ -147,16 +147,19 @@ impl StaticProvider { let mut guard = self.nodes.write().expect("poisoned"); match guard.entry(node_id) { Entry::Occupied(mut entry) => { - let existing = entry.get_mut(); - existing - .data - .add_direct_addresses(data.direct_addresses().iter().copied()); - existing.data.set_relay_url(data.relay_url().cloned()); - existing.data.set_user_data(data.user_data().cloned()); - existing.last_updated = last_updated; + if let Some(existing) = entry.get_mut() { + existing + .data + .add_direct_addresses(data.direct_addresses().iter().copied()); + existing.data.set_relay_url(data.relay_url().cloned()); + existing.data.set_user_data(data.user_data().cloned()); + existing.last_updated = last_updated; + } else { + entry.insert(Some(StoredNodeInfo { data, last_updated })); + } } Entry::Vacant(entry) => { - entry.insert(StoredNodeInfo { data, last_updated }); + entry.insert(Some(StoredNodeInfo { data, last_updated })); } } } @@ -164,7 +167,7 @@ impl StaticProvider { /// Returns node addressing information for the given node ID. pub fn get_node_info(&self, node_id: NodeId) -> Option { let guard = self.nodes.read().expect("poisoned"); - let info = guard.get(&node_id)?; + let info = guard.get(&node_id)?.as_ref()?; Some(NodeInfo::from_parts(node_id, info.data.clone())) } @@ -174,7 +177,7 @@ impl StaticProvider { pub fn remove_node_info(&self, node_id: NodeId) -> Option { let mut guard = self.nodes.write().expect("poisoned"); let info = guard.remove(&node_id)?; - Some(NodeInfo::from_parts(node_id, info.data)) + Some(NodeInfo::from_parts(node_id, info?.data)) } } @@ -188,7 +191,7 @@ impl Discovery for StaticProvider { let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { - Some(node_info) => { + Some(Some(node_info)) => { let last_updated = node_info .last_updated .duration_since(SystemTime::UNIX_EPOCH) @@ -201,6 +204,7 @@ impl Discovery for StaticProvider { ); Some(stream::iter(Some(Ok(DiscoveryEvent::Discovered(item)))).boxed()) } + Some(None) => Some(stream::iter(Some(Ok(DiscoveryEvent::Expired(node_id)))).boxed()), None => None, } } From 80057f1fbc6f8a60092a52eaa2837e364f656d0d Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 7 Sep 2025 15:27:18 +0800 Subject: [PATCH 6/9] apply fixes from PR discussion --- iroh/src/discovery.rs | 34 +++++++++++-------------- iroh/src/discovery/dns.rs | 11 +++----- iroh/src/discovery/mdns.rs | 36 ++++++++++++++++----------- iroh/src/discovery/pkarr.rs | 9 +++---- iroh/src/discovery/pkarr/dht.rs | 12 ++------- iroh/src/discovery/static_provider.rs | 10 ++++---- iroh/tests/integration.rs | 4 +-- 7 files changed, 51 insertions(+), 65 deletions(-) diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index aacdcc821b8..2948e340f7c 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -326,7 +326,7 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { fn resolve( &self, _node_id: NodeId, - ) -> Option>> { + ) -> Option>> { None } @@ -498,10 +498,7 @@ impl Discovery for ConcurrentDiscovery { } } - fn resolve( - &self, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, node_id: NodeId) -> Option>> { let streams = self .services .iter() @@ -605,7 +602,7 @@ impl DiscoveryTask { fn create_stream( ep: &Endpoint, node_id: NodeId, - ) -> Result>, DiscoveryError> { + ) -> Result>, DiscoveryError> { let discovery = ep.discovery().ok_or(NoServiceConfiguredSnafu.build())?; let stream = discovery .resolve(node_id) @@ -654,22 +651,21 @@ impl DiscoveryTask { loop { match stream.next().await { Some(Ok(r)) => { - if let DiscoveryEvent::Discovered(r) = &r { - let provenance = r.provenance; - let node_addr = r.to_node_addr(); - if node_addr.is_empty() { - debug!(%provenance, "empty address found"); - continue; - } - debug!(%provenance, addr = ?node_addr, "new address found"); - ep.add_node_addr_with_source(node_addr, provenance).ok(); + let provenance = r.provenance; + let node_addr = r.to_node_addr(); + if node_addr.is_empty() { + debug!(%provenance, "empty address found"); + continue; } + debug!(%provenance, addr = ?node_addr, "new address found"); + ep.add_node_addr_with_source(node_addr, provenance).ok(); if let Some(tx) = on_first_tx.take() { tx.send(Ok(())).ok(); } // Send the discovery item to the subscribers of the discovery broadcast stream. - ep.discovery_subscribers().send(r); + ep.discovery_subscribers() + .send(DiscoveryEvent::Discovered(r)); } Some(Err(err)) => { warn!(?err, "discovery service produced error"); @@ -816,7 +812,7 @@ mod tests { fn resolve( &self, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let addr_info = if self.resolve_wrong { let ts = system_time_now() - 100_000; let port: u16 = rand::thread_rng().gen_range(10_000..20_000); @@ -838,7 +834,7 @@ mod tests { let fut = async move { time::sleep(delay).await; tracing::debug!("resolve: {} = {item:?}", node_id.fmt_short()); - Ok(DiscoveryEvent::Discovered(item)) + Ok(item) }; n0_future::stream::once_future(fut).boxed() } @@ -864,7 +860,7 @@ mod tests { fn resolve( &self, _node_id: NodeId, - ) -> Option>> { + ) -> Option>> { Some(n0_future::stream::empty().boxed()) } } diff --git a/iroh/src/discovery/dns.rs b/iroh/src/discovery/dns.rs index 7ab9db538db..24f629b85bc 100644 --- a/iroh/src/discovery/dns.rs +++ b/iroh/src/discovery/dns.rs @@ -5,7 +5,7 @@ use iroh_relay::dns::DnsResolver; pub use iroh_relay::dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}; use n0_future::boxed::BoxStream; -use super::{DiscoveryContext, DiscoveryError, DiscoveryEvent, IntoDiscovery, IntoDiscoveryError}; +use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; use crate::{ discovery::{Discovery, DiscoveryItem}, endpoint::force_staging_infra, @@ -105,10 +105,7 @@ impl IntoDiscovery for DnsDiscoveryBuilder { } impl Discovery for DnsDiscovery { - fn resolve( - &self, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, node_id: NodeId) -> Option>> { let resolver = self.dns_resolver.clone(); let origin_domain = self.origin_domain.clone(); let fut = async move { @@ -116,9 +113,7 @@ impl Discovery for DnsDiscovery { .lookup_node_by_id_staggered(&node_id, &origin_domain, DNS_STAGGERING_MS) .await .map_err(|e| DiscoveryError::from_err("dns", e))?; - Ok(DiscoveryEvent::Discovered(DiscoveryItem::new( - node_info, "dns", None, - ))) + Ok(DiscoveryItem::new(node_info, "dns", None)) }; let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index 93aef27ef0c..e6ffeecc513 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -79,7 +79,7 @@ pub struct MdnsDiscovery { #[derive(Debug)] enum Message { Discovery(String, Peer), - Resolve(NodeId, mpsc::Sender>), + Resolve(NodeId, mpsc::Sender>), Timeout(NodeId, usize), Subscribe(mpsc::Sender), } @@ -169,7 +169,7 @@ impl MdnsDiscovery { let mut last_id = 0; let mut senders: HashMap< PublicKey, - HashMap>>, + HashMap>>, > = HashMap::default(); let mut timeouts = JoinSet::new(); loop { @@ -264,10 +264,7 @@ impl MdnsDiscovery { trace!(?item, senders = senders.len(), "sending DiscoveryItem"); resolved = true; for sender in senders.values() { - sender - .send(Ok(DiscoveryEvent::Discovered(item.clone()))) - .await - .ok(); + sender.send(Ok(item.clone())).await.ok(); } } entry.or_insert(peer_info); @@ -286,7 +283,7 @@ impl MdnsDiscovery { if let Some(peer_info) = node_addrs.get(&node_id) { let item = peer_to_discovery_item(peer_info, &node_id); debug!(?item, "sending DiscoveryItem"); - sender.send(Ok(DiscoveryEvent::Discovered(item))).await.ok(); + sender.send(Ok(item)).await.ok(); } if let Some(senders_for_node_id) = senders.get_mut(&node_id) { senders_for_node_id.insert(id, sender); @@ -409,10 +406,7 @@ fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem { } impl Discovery for MdnsDiscovery { - fn resolve( - &self, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, node_id: NodeId) -> Option>> { use futures_util::FutureExt; let (send, recv) = mpsc::channel(20); @@ -471,8 +465,20 @@ mod tests { .with_user_data(Some(user_data.clone())); // resolve twice to ensure we can create separate streams for the same node_id - let mut s1 = discovery_a.resolve(node_id_b).unwrap(); - let mut s2 = discovery_a.resolve(node_id_b).unwrap(); + let mut s1 = discovery_a + .subscribe() + .unwrap() + .filter(|event| match event { + DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, + _ => false, + }); + let mut s2 = discovery_a + .subscribe() + .unwrap() + .filter(|event| match event { + DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, + _ => false, + }); tracing::debug!(?node_id_b, "Discovering node id b"); // publish discovery_b's address @@ -481,7 +487,7 @@ mod tests { tokio::time::timeout(Duration::from_secs(5), s1.next()) .await .context("timeout")? - .unwrap()? + .unwrap() else { panic!("Received unexpected discovery event"); }; @@ -489,7 +495,7 @@ mod tests { tokio::time::timeout(Duration::from_secs(5), s2.next()) .await .context("timeout")? - .unwrap()? + .unwrap() else { panic!("Received unexpected discovery event"); }; diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index 9ab10409385..29d2ccd275f 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -62,7 +62,7 @@ use snafu::{ResultExt, Snafu}; use tracing::{Instrument, debug, error_span, warn}; use url::Url; -use super::{DiscoveryContext, DiscoveryError, DiscoveryEvent, IntoDiscovery, IntoDiscoveryError}; +use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; #[cfg(not(wasm_browser))] use crate::dns::DnsResolver; use crate::{ @@ -500,17 +500,14 @@ impl PkarrResolver { } impl Discovery for PkarrResolver { - fn resolve( - &self, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, node_id: NodeId) -> Option>> { let pkarr_client = self.pkarr_client.clone(); let fut = async move { let signed_packet = pkarr_client.resolve(node_id).await?; let info = NodeInfo::from_pkarr_signed_packet(&signed_packet) .map_err(|err| DiscoveryError::from_err("pkarr", err))?; let item = DiscoveryItem::new(info, "pkarr", None); - Ok(DiscoveryEvent::Discovered(item)) + Ok(item) }; let stream = n0_future::stream::once_future(fut); Some(Box::pin(stream)) diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index f0933b0f9e3..f9a2ceda2d7 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -19,7 +19,7 @@ use url::Url; use crate::{ discovery::{ - Discovery, DiscoveryContext, DiscoveryError, DiscoveryEvent, DiscoveryItem, IntoDiscovery, + Discovery, DiscoveryContext, DiscoveryError, DiscoveryItem, IntoDiscovery, IntoDiscoveryError, NodeData, pkarr::{DEFAULT_PKARR_TTL, N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING}, }, @@ -314,10 +314,7 @@ impl Discovery for DhtDiscovery { *task = Some(AbortOnDropHandle::new(curr)); } - fn resolve( - &self, - node_id: NodeId, - ) -> Option>> { + fn resolve(&self, node_id: NodeId) -> Option>> { let pkarr_public_key = pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32()); @@ -326,7 +323,6 @@ impl Discovery for DhtDiscovery { discovery.resolve_pkarr(pkarr_public_key).await }) .filter_map(|x| x) - .map(|x| x.map(DiscoveryEvent::Discovered)) .boxed(); Some(stream) } @@ -375,10 +371,6 @@ mod tests { .collect::>() .await; for item in items.into_iter().flatten() { - let DiscoveryEvent::Discovered(item) = item else { - continue; - }; - if let Some(url) = item.relay_url() { found_relay_urls.insert(url.clone()); } diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index 2c90e853701..5a81da2e5dc 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -22,7 +22,7 @@ use n0_future::{ time::SystemTime, }; -use super::{Discovery, DiscoveryError, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo}; +use super::{Discovery, DiscoveryError, DiscoveryItem, NodeData, NodeInfo}; /// A static node discovery to manually add node addressing information. /// @@ -133,7 +133,7 @@ impl StaticProvider { let NodeInfo { node_id, data } = node_info.into(); let mut guard = self.nodes.write().expect("poisoned"); let previous = guard.insert(node_id, Some(StoredNodeInfo { data, last_updated })); - previous.map(|x| x.data) + previous.and_then(|x| x.map(|x| x.data)) } /// Augments node addressing information for the given node ID. @@ -187,7 +187,7 @@ impl Discovery for StaticProvider { fn resolve( &self, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { @@ -202,9 +202,9 @@ impl Discovery for StaticProvider { Self::PROVENANCE, Some(last_updated), ); - Some(stream::iter(Some(Ok(DiscoveryEvent::Discovered(item)))).boxed()) + Some(stream::iter(Some(Ok(item))).boxed()) } - Some(None) => Some(stream::iter(Some(Ok(DiscoveryEvent::Expired(node_id)))).boxed()), + Some(None) => Some(stream::iter(None).boxed()), None => None, } } diff --git a/iroh/tests/integration.rs b/iroh/tests/integration.rs index b987f28e96a..e38bfb4bea2 100644 --- a/iroh/tests/integration.rs +++ b/iroh/tests/integration.rs @@ -11,7 +11,7 @@ //! we won't hit these with only this integration test. use iroh::{ Endpoint, RelayMode, - discovery::{Discovery, DiscoveryEvent, pkarr::PkarrResolver}, + discovery::{Discovery, pkarr::PkarrResolver}, }; use n0_future::{ StreamExt, task, @@ -87,7 +87,7 @@ async fn simple_node_id_based_connection_transfer() -> Result { let Some(mut stream) = resolver.resolve(node_id) else { continue; }; - let Ok(Some(DiscoveryEvent::Discovered(item))) = stream.try_next().await else { + let Ok(Some(item)) = stream.try_next().await else { continue; }; if item.relay_url().is_some() { From 20eb5729fbd27944252db470024ad3c926d95da6 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 14 Sep 2025 13:23:12 +0800 Subject: [PATCH 7/9] nit --- iroh/src/discovery/mdns.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index e6ffeecc513..3b23dc2b355 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -197,7 +197,6 @@ impl MdnsDiscovery { error!("MdnsDiscovery channel closed"); error!("closing MdnsDiscovery"); timeouts.abort_all(); - return; } }; @@ -207,7 +206,6 @@ impl MdnsDiscovery { error!("closing MdnsDiscovery"); timeouts.abort_all(); discovery.remove_all(); - return; } Some(msg) => msg, From 3f24253dcf131045ad47cab927662f24f5c5b682 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 14 Sep 2025 13:37:51 +0800 Subject: [PATCH 8/9] move back to `AbortOnDrop` --- iroh/src/discovery/mdns.rs | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index b5a56a01582..bc9fa49c5c2 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -39,7 +39,7 @@ use std::{ use iroh_base::{NodeId, PublicKey}; use n0_future::{ boxed::BoxStream, - task::{self, JoinSet}, + task::{self, AbortOnDropHandle, JoinSet}, time::{self, Duration}, }; use n0_watcher::{Watchable, Watcher as _}; @@ -70,9 +70,10 @@ const DISCOVERY_DURATION: Duration = Duration::from_secs(10); /// Discovery using `swarm-discovery`, a variation on mdns #[derive(Debug)] pub struct MdnsDiscovery { + #[allow(dead_code)] + handle: AbortOnDropHandle<()>, sender: mpsc::Sender, advertise: bool, - shutdown: mpsc::Sender<()>, /// When `local_addrs` changes, we re-publish our info. local_addrs: Watchable>, } @@ -185,7 +186,6 @@ impl MdnsDiscovery { pub fn new(node_id: NodeId, advertise: bool) -> Result { debug!("Creating new MdnsDiscovery service"); let (send, mut recv) = mpsc::channel(64); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); let task_sender = send.clone(); let rt = tokio::runtime::Handle::current(); let discovery = MdnsDiscovery::spawn_discoverer( @@ -228,12 +228,6 @@ impl MdnsDiscovery { } continue; } - _ = shutdown_rx.recv() => { - error!("MdnsDiscovery channel closed"); - error!("closing MdnsDiscovery"); - timeouts.abort_all(); - return; - } }; let msg = match msg { None => { @@ -351,11 +345,11 @@ impl MdnsDiscovery { } } }; - task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); + let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { + handle: AbortOnDropHandle::new(handle), sender: send, advertise, - shutdown: shutdown_tx, local_addrs, }) } @@ -411,12 +405,6 @@ impl MdnsDiscovery { } } -impl Drop for MdnsDiscovery { - fn drop(&mut self) { - self.shutdown.try_send(()).ok(); - } -} - fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem { let direct_addresses: BTreeSet = peer .addrs() @@ -549,8 +537,8 @@ mod tests { #[tokio::test] #[traced_test] async fn mdns_publish_expire() -> Result { - let (_, discovery_a) = make_discoverer()?; - let (node_id_b, discovery_b) = make_discoverer()?; + let (_, discovery_a) = make_discoverer(false)?; + let (node_id_b, discovery_b) = make_discoverer(true)?; // publish discovery_b's address let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) From 862ecb9a861ce3d5e8aa597c96546b417a17a743 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 14 Sep 2025 13:45:01 +0800 Subject: [PATCH 9/9] revert static provider changes --- iroh/src/discovery/static_provider.rs | 32 ++++++++++++--------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index 5a81da2e5dc..773cca5c0a8 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -67,7 +67,7 @@ use super::{Discovery, DiscoveryError, DiscoveryItem, NodeData, NodeInfo}; #[derive(Debug, Default, Clone)] #[repr(transparent)] pub struct StaticProvider { - nodes: Arc>>>, + nodes: Arc>>, } #[derive(Debug)] @@ -132,8 +132,8 @@ impl StaticProvider { let last_updated = SystemTime::now(); let NodeInfo { node_id, data } = node_info.into(); let mut guard = self.nodes.write().expect("poisoned"); - let previous = guard.insert(node_id, Some(StoredNodeInfo { data, last_updated })); - previous.and_then(|x| x.map(|x| x.data)) + let previous = guard.insert(node_id, StoredNodeInfo { data, last_updated }); + previous.map(|x| x.data) } /// Augments node addressing information for the given node ID. @@ -147,19 +147,16 @@ impl StaticProvider { let mut guard = self.nodes.write().expect("poisoned"); match guard.entry(node_id) { Entry::Occupied(mut entry) => { - if let Some(existing) = entry.get_mut() { - existing - .data - .add_direct_addresses(data.direct_addresses().iter().copied()); - existing.data.set_relay_url(data.relay_url().cloned()); - existing.data.set_user_data(data.user_data().cloned()); - existing.last_updated = last_updated; - } else { - entry.insert(Some(StoredNodeInfo { data, last_updated })); - } + let existing = entry.get_mut(); + existing + .data + .add_direct_addresses(data.direct_addresses().iter().copied()); + existing.data.set_relay_url(data.relay_url().cloned()); + existing.data.set_user_data(data.user_data().cloned()); + existing.last_updated = last_updated; } Entry::Vacant(entry) => { - entry.insert(Some(StoredNodeInfo { data, last_updated })); + entry.insert(StoredNodeInfo { data, last_updated }); } } } @@ -167,7 +164,7 @@ impl StaticProvider { /// Returns node addressing information for the given node ID. pub fn get_node_info(&self, node_id: NodeId) -> Option { let guard = self.nodes.read().expect("poisoned"); - let info = guard.get(&node_id)?.as_ref()?; + let info = guard.get(&node_id)?; Some(NodeInfo::from_parts(node_id, info.data.clone())) } @@ -177,7 +174,7 @@ impl StaticProvider { pub fn remove_node_info(&self, node_id: NodeId) -> Option { let mut guard = self.nodes.write().expect("poisoned"); let info = guard.remove(&node_id)?; - Some(NodeInfo::from_parts(node_id, info?.data)) + Some(NodeInfo::from_parts(node_id, info.data)) } } @@ -191,7 +188,7 @@ impl Discovery for StaticProvider { let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { - Some(Some(node_info)) => { + Some(node_info) => { let last_updated = node_info .last_updated .duration_since(SystemTime::UNIX_EPOCH) @@ -204,7 +201,6 @@ impl Discovery for StaticProvider { ); Some(stream::iter(Some(Ok(item))).boxed()) } - Some(None) => Some(stream::iter(None).boxed()), None => None, } }