diff --git a/iroh/examples/locally-discovered-nodes.rs b/iroh/examples/locally-discovered-nodes.rs index 8eae4679d1..5257f6b128 100644 --- a/iroh/examples/locally-discovered-nodes.rs +++ b/iroh/examples/locally-discovered-nodes.rs @@ -5,7 +5,7 @@ //! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well. use std::time::Duration; -use iroh::{Endpoint, NodeId, node_info::UserData}; +use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData}; use n0_future::StreamExt; use n0_snafu::Result; use tokio::task::JoinSet; @@ -32,7 +32,7 @@ async fn main() -> Result<()> { tracing::error!("{e}"); return; } - Ok(item) => { + Ok(DiscoveryEvent::Discovered(item)) => { // if there is no user data, or the user data // does not indicate that the discovered node // is a part of the example, ignore it @@ -53,6 +53,7 @@ async fn main() -> Result<()> { println!("Found node {}!", item.node_id().fmt_short()); } } + Ok(DiscoveryEvent::Expired(_)) => {} }; } }); diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index a1907c2728..2948e340f7 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -352,13 +352,23 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { /// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system /// and add the discovered addresses to the internal address book as they arrive /// on this stream. - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { None } } impl Discovery for Arc {} +/// An event emitted from [`Discovery`] services. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum DiscoveryEvent { + /// A peer was discovered or it's information was updated. + Discovered(DiscoveryItem), + /// A peer was expired due to being inactive, unreachable, or otherwise + /// unavailable. + Expired(NodeId), +} + /// Node discovery results from [`Discovery`] services. /// /// This is the item in the streams returned from [`Discovery::resolve`] and @@ -367,7 +377,7 @@ impl Discovery for Arc {} /// /// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`] /// directly from [`DiscoveryItem`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct DiscoveryItem { /// The node info for the node, as discovered by the the discovery service. node_info: NodeInfo, @@ -498,7 +508,7 @@ impl Discovery for ConcurrentDiscovery { Some(Box::pin(streams)) } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { let mut streams = vec![]; for service in self.services.iter() { if let Some(stream) = service.subscribe() { @@ -649,11 +659,13 @@ impl DiscoveryTask { } debug!(%provenance, addr = ?node_addr, "new address found"); ep.add_node_addr_with_source(node_addr, provenance).ok(); + if let Some(tx) = on_first_tx.take() { tx.send(Ok(())).ok(); } // Send the discovery item to the subscribers of the discovery broadcast stream. - ep.discovery_subscribers().send(r); + ep.discovery_subscribers() + .send(DiscoveryEvent::Discovered(r)); } Some(Err(err)) => { warn!(?err, "discovery service produced error"); @@ -685,7 +697,7 @@ pub struct Lagged { #[derive(Clone, Debug)] pub(super) struct DiscoverySubscribers { - inner: tokio::sync::broadcast::Sender, + inner: tokio::sync::broadcast::Sender, } impl DiscoverySubscribers { @@ -699,13 +711,13 @@ impl DiscoverySubscribers { } } - pub(crate) fn subscribe(&self) -> impl Stream> + use<> { + pub(crate) fn subscribe(&self) -> impl Stream> + use<> { use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; let recv = self.inner.subscribe(); BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n }) } - pub(crate) fn send(&self, item: DiscoveryItem) { + pub(crate) fn send(&self, item: DiscoveryEvent) { // `broadcast::Sender::send` returns an error if the channel has no subscribers, // which we don't care about. self.inner.send(item).ok(); @@ -737,7 +749,7 @@ mod tests { #[derive(Debug, Clone)] struct TestDiscoveryShared { nodes: Arc>, - watchers: tokio::sync::broadcast::Sender, + watchers: tokio::sync::broadcast::Sender, } impl Default for TestDiscoveryShared { @@ -770,7 +782,7 @@ mod tests { } } - pub fn send_passive(&self, item: DiscoveryItem) { + pub fn send_passive(&self, item: DiscoveryEvent) { self.watchers.send(item).ok(); } } @@ -831,7 +843,7 @@ mod tests { Some(stream) } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { let recv = self.shared.watchers.subscribe(); let stream = tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok()); @@ -1016,11 +1028,16 @@ mod tests { ep2.node_addr().initialized().await; let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?; - let item = tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged"); + let DiscoveryEvent::Discovered(item) = + tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .expect("timeout") + .expect("stream closed") + .expect("stream lagged") + else { + panic!("Returned unexpected discovery event!"); + }; + assert_eq!(item.node_id(), ep2.node_id()); assert_eq!(item.provenance(), "test-disco"); @@ -1028,13 +1045,17 @@ mod tests { let passive_node_id = SecretKey::generate(rand::thread_rng()).public(); let node_info = NodeInfo::new(passive_node_id); let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None); - disco_shared.send_passive(passive_item.clone()); - - let item = tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged"); + disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone())); + + let DiscoveryEvent::Discovered(item) = + tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .expect("timeout") + .expect("stream closed") + .expect("stream lagged") + else { + panic!("Returned unexpected discovery event!"); + }; assert_eq!(item.node_id(), passive_node_id); assert_eq!(item.provenance(), "test-disco-passive"); diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index 25c892db0e..bc9fa49c5c 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -48,7 +48,7 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{Instrument, debug, error, info_span, trace, warn}; use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; -use crate::discovery::{Discovery, DiscoveryItem, NodeData, NodeInfo}; +use crate::discovery::{Discovery, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo}; /// The n0 local swarm node discovery name const N0_LOCAL_SWARM: &str = "iroh.local.swarm"; @@ -83,12 +83,12 @@ enum Message { Discovery(String, Peer), Resolve(NodeId, mpsc::Sender>), Timeout(NodeId, usize), - Subscribe(mpsc::Sender), + Subscribe(mpsc::Sender), } /// Manages the list of subscribers that are subscribed to this discovery service. #[derive(Debug)] -struct Subscribers(Vec>); +struct Subscribers(Vec>); impl Subscribers { fn new() -> Self { @@ -96,14 +96,14 @@ impl Subscribers { } /// Add the subscriber to the list of subscribers - fn push(&mut self, subscriber: mpsc::Sender) { + fn push(&mut self, subscriber: mpsc::Sender) { self.0.push(subscriber); } /// Sends the `node_id` and `item` to each subscriber. /// /// Cleans up any subscribers that have been dropped. - fn send(&mut self, item: DiscoveryItem) { + fn send(&mut self, item: DiscoveryEvent) { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped @@ -234,6 +234,7 @@ impl MdnsDiscovery { error!("MdnsDiscovery channel closed"); error!("closing MdnsDiscovery"); timeouts.abort_all(); + discovery.remove_all(); return; } Some(msg) => msg, @@ -266,6 +267,7 @@ impl MdnsDiscovery { "removing node from MdnsDiscovery address book" ); node_addrs.remove(&discovered_node_id); + subscribers.send(DiscoveryEvent::Expired(discovered_node_id)); continue; } @@ -298,7 +300,7 @@ impl MdnsDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(item); + subscribers.send(DiscoveryEvent::Discovered(item)); } } Message::Resolve(node_id, sender) => { @@ -346,8 +348,8 @@ impl MdnsDiscovery { let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { handle: AbortOnDropHandle::new(handle), - advertise, sender: send, + advertise, local_addrs, }) } @@ -450,7 +452,7 @@ impl Discovery for MdnsDiscovery { } } - fn subscribe(&self) -> Option> { + fn subscribe(&self) -> Option> { use futures_util::FutureExt; let (sender, recv) = mpsc::channel(20); @@ -490,29 +492,99 @@ mod tests { let user_data: UserData = "foobar".parse()?; let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) .with_user_data(Some(user_data.clone())); - println!("info {node_data:?}"); // resolve twice to ensure we can create separate streams for the same node_id - let mut s1 = discovery_a.resolve(node_id_b).unwrap(); - let mut s2 = discovery_a.resolve(node_id_b).unwrap(); + let mut s1 = discovery_a + .subscribe() + .unwrap() + .filter(|event| match event { + DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, + _ => false, + }); + let mut s2 = discovery_a + .subscribe() + .unwrap() + .filter(|event| match event { + DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, + _ => false, + }); tracing::debug!(?node_id_b, "Discovering node id b"); // publish discovery_b's address discovery_b.publish(&node_data); - let s1_res = tokio::time::timeout(Duration::from_secs(5), s1.next()) - .await - .context("timeout")? - .unwrap()?; - let s2_res = tokio::time::timeout(Duration::from_secs(5), s2.next()) - .await - .context("timeout")? - .unwrap()?; + let DiscoveryEvent::Discovered(s1_res) = + tokio::time::timeout(Duration::from_secs(5), s1.next()) + .await + .context("timeout")? + .unwrap() + else { + panic!("Received unexpected discovery event"); + }; + let DiscoveryEvent::Discovered(s2_res) = + tokio::time::timeout(Duration::from_secs(5), s2.next()) + .await + .context("timeout")? + .unwrap() + else { + panic!("Received unexpected discovery event"); + }; assert_eq!(s1_res.node_info().data, node_data); assert_eq!(s2_res.node_info().data, node_data); Ok(()) } + #[tokio::test] + #[traced_test] + async fn mdns_publish_expire() -> Result { + let (_, discovery_a) = make_discoverer(false)?; + let (node_id_b, discovery_b) = make_discoverer(true)?; + + // publish discovery_b's address + let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()])) + .with_user_data(Some("".parse()?)); + discovery_b.publish(&node_data); + + let mut s1 = discovery_a.subscribe().unwrap(); + tracing::debug!(?node_id_b, "Discovering node id b"); + + // Wait for the specific node to be discovered + loop { + let event = tokio::time::timeout(Duration::from_secs(5), s1.next()) + .await + .context("timeout")? + .expect("Stream should not be closed"); + + match event { + DiscoveryEvent::Discovered(item) if item.node_info().node_id == node_id_b => { + break; + } + _ => continue, // Ignore other discovery events + } + } + + // Shutdown node B + drop(discovery_b); + tokio::time::sleep(Duration::from_secs(5)).await; + + // Wait for the expiration event for the specific node + loop { + let event = tokio::time::timeout(Duration::from_secs(10), s1.next()) + .await + .context("timeout waiting for expiration event")? + .expect("Stream should not be closed"); + + match event { + DiscoveryEvent::Expired(expired_node_id) if expired_node_id == node_id_b => { + break; + } + _ => continue, // Ignore other events + } + } + + Ok(()) + } + #[tokio::test] #[traced_test] async fn mdns_subscribe() -> Result { @@ -537,7 +609,7 @@ mod tests { let test = async move { let mut got_ids = BTreeSet::new(); while got_ids.len() != num_nodes { - if let Some(item) = events.next().await { + if let Some(DiscoveryEvent::Discovered(item)) = events.next().await { if node_ids.contains(&(item.node_id(), item.user_data())) { got_ids.insert((item.node_id(), item.user_data())); } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 3636e9ab48..ba1206f480 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::discovery::pkarr::PkarrResolver; use crate::{discovery::dns::DnsDiscovery, dns::DnsResolver}; use crate::{ discovery::{ - ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, DiscoveryItem, + ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, DiscoveryEvent, DiscoverySubscribers, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, IntoDiscoveryError, Lagged, UserData, pkarr::PkarrPublisher, }, @@ -1109,7 +1109,7 @@ impl Endpoint { /// Returns a stream of all remote nodes discovered through the endpoint's discovery services. /// /// Whenever a node is discovered via the endpoint's discovery service, the corresponding - /// [`DiscoveryItem`] is yielded from this stream. This includes nodes discovered actively + /// [`DiscoveryEvent`] is yielded from this stream. This includes nodes discovered actively /// through [`Discovery::resolve`], which is invoked automatically when calling /// [`Endpoint::connect`] for a [`NodeId`] unknown to the endpoint. It also includes /// nodes that the endpoint discovers passively from discovery services that implement @@ -1130,7 +1130,7 @@ impl Endpoint { /// /// [`MdnsDiscovery`]: crate::discovery::mdns::MdnsDiscovery /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - pub fn discovery_stream(&self) -> impl Stream> + use<> { + pub fn discovery_stream(&self) -> impl Stream> + use<> { self.msock.discovery_subscribers().subscribe() } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index babd3f1f50..ab9715c3a4 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -71,7 +71,7 @@ use crate::net_report::{IpMappedAddr, QuicConfig}; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, - discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData}, + discovery::{Discovery, DiscoveryEvent, DiscoverySubscribers, NodeData, UserData}, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, metrics::EndpointMetrics, net_report::{self, IfStateDetails, IpMappedAddresses, Report}, @@ -1857,7 +1857,7 @@ impl Actor { .port_mapper .watch_external_address(); - let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); + let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); if let Some(d) = self.msock.discovery() { if let Some(events) = d.subscribe() { discovery_events = events; @@ -2002,16 +2002,19 @@ impl Actor { // forever like we do with the other branches that yield `Option`s Some(discovery_item) = discovery_events.next() => { trace!("tick: discovery event, address discovered: {discovery_item:?}"); - let provenance = discovery_item.provenance(); - let node_addr = discovery_item.to_node_addr(); - if let Err(e) = self.msock.add_node_addr( - node_addr, - Source::Discovery { - name: provenance.to_string() - }) { + if let DiscoveryEvent::Discovered(discovery_item) = &discovery_item { + let provenance = discovery_item.provenance(); let node_addr = discovery_item.to_node_addr(); - warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); + if let Err(e) = self.msock.add_node_addr( + node_addr, + Source::Discovery { + name: provenance.to_string() + }) { + let node_addr = discovery_item.to_node_addr(); + warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); + } } + // Send the discovery item to the subscribers of the discovery broadcast stream. self.msock.discovery_subscribers.send(discovery_item); }