diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index be4e0b0cc..549daba51 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -259,16 +259,29 @@ impl Handler for Neighborhood { type Result = (); fn handle(&mut self, msg: ConnectionProgressMessage, ctx: &mut Self::Context) -> Self::Result { - self.overall_connection_status.update_connection_stage( - msg.peer_addr, - msg.event.clone(), - self.node_to_ui_recipient_opt - .as_ref() - .expect("UI Gateway is unbound"), - ); - - if msg.event == ConnectionProgressEvent::TcpConnectionSuccessful { - self.send_ask_about_debut_gossip_message(ctx, msg.peer_addr); + if let Ok(connection_progress) = self + .overall_connection_status + .get_connection_progress_by_ip(msg.peer_addr) + { + OverallConnectionStatus::update_connection_stage( + connection_progress, + msg.event.clone(), + ); + match msg.event { + ConnectionProgressEvent::TcpConnectionSuccessful => { + self.send_ask_about_debut_gossip_message(ctx, msg.peer_addr); + } + ConnectionProgressEvent::IntroductionGossipReceived(_) + | ConnectionProgressEvent::StandardGossipReceived => { + self.overall_connection_status + .update_ocs_stage_and_send_message_to_ui( + self.node_to_ui_recipient_opt + .as_ref() + .expect("UI Gateway is unbound"), + ); + } + _ => (), + } } } } @@ -281,19 +294,17 @@ impl Handler for Neighborhood { msg: AskAboutDebutGossipMessage, _ctx: &mut Self::Context, ) -> Self::Result { - let new_connection_progress = self + if let Ok(current_connection_progress) = self .overall_connection_status - .get_connection_progress_by_desc(&msg.prev_connection_progress.initial_node_descriptor); - - if msg.prev_connection_progress == *new_connection_progress { - // No change, hence no response was received - self.overall_connection_status.update_connection_stage( - msg.prev_connection_progress.current_peer_addr, - ConnectionProgressEvent::NoGossipResponseReceived, - self.node_to_ui_recipient_opt - .as_ref() - .expect("UI Gateway is unbound"), - ); + .get_connection_progress_by_desc(&msg.prev_connection_progress.initial_node_descriptor) + { + if msg.prev_connection_progress == *current_connection_progress { + // No change, hence no response was received + OverallConnectionStatus::update_connection_stage( + current_connection_progress, + ConnectionProgressEvent::NoGossipResponseReceived, + ); + } } } } @@ -1171,7 +1182,9 @@ impl Neighborhood { prev_connection_progress: self .overall_connection_status .get_connection_progress_by_ip(current_peer_addr) + .unwrap() .clone(), + // TODO: Do Something with the error - "Cannot send AskAboutDebutGossipMessage for peer with IP Address: {}" }; self.tools.notify_later_ask_about_gossip.notify_later( message, @@ -1331,6 +1344,7 @@ pub fn regenerate_signed_gossip( #[cfg(test)] mod tests { + use std::any::TypeId; use std::cell::RefCell; use std::convert::TryInto; use std::net::{IpAddr, SocketAddr}; @@ -1343,14 +1357,16 @@ mod tests { use actix::Recipient; use actix::System; use itertools::Itertools; + use serde_cbor; use std::time::Duration; use tokio::prelude::Future; use masq_lib::constants::{DEFAULT_CHAIN, TLS_PORT}; + use masq_lib::messages::{ToMessageBody, UiConnectionChangeBroadcast, UiConnectionChangeStage}; use masq_lib::test_utils::utils::{ensure_node_home_directory_exists, TEST_DEFAULT_CHAIN}; - use masq_lib::ui_gateway::MessageBody; use masq_lib::ui_gateway::MessagePath::Conversation; + use masq_lib::ui_gateway::{MessageBody, MessageTarget}; use masq_lib::utils::running_test; use crate::db_config::persistent_configuration::PersistentConfigError; @@ -1374,8 +1390,9 @@ mod tests { use crate::test_utils::make_meaningless_route; use crate::test_utils::make_wallet; use crate::test_utils::neighborhood_test_utils::{ - db_from_node, make_global_cryptde_node_record, make_node_descriptor, make_node_record, - make_node_record_f, make_node_to_ui_recipient, neighborhood_from_nodes, + db_from_node, make_global_cryptde_node_record, make_ip, make_node, make_node_descriptor, + make_node_record, make_node_record_f, make_node_to_ui_recipient, + make_recipient_and_recording_arc, neighborhood_from_nodes, }; use crate::test_utils::persistent_configuration_mock::PersistentConfigurationMock; use crate::test_utils::rate_pack; @@ -1393,7 +1410,9 @@ mod tests { use crate::neighborhood::overall_connection_status::ConnectionStageErrors::{ NoGossipResponseReceived, PassLoopFound, TcpConnectionFailed, }; - use crate::neighborhood::overall_connection_status::{ConnectionProgress, ConnectionStage}; + use crate::neighborhood::overall_connection_status::{ + ConnectionProgress, ConnectionStage, OverallConnectionStage, + }; use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; impl Handler> for Neighborhood { @@ -1634,11 +1653,38 @@ mod tests { ); } + #[test] + pub fn neighborhood_doesn_t_do_anything_if_it_receives_a_cpm_with_an_unknown_peer_addr() { + let known_peer = make_ip(1); + let unknown_peer = make_ip(2); + let node_descriptor = make_node_descriptor(known_peer); + let subject = make_subject_from_node_descriptor( + &node_descriptor, + "neighborhood_doesn_t_do_anything_if_it_receives_a_cpm_with_an_unknown_peer_addr", + ); + let initial_ocs = subject.overall_connection_status.clone(); + let addr = subject.start(); + let cpm_recipient = addr.clone().recipient::(); + let system = System::new("testing"); + let cpm = ConnectionProgressMessage { + peer_addr: unknown_peer, + event: ConnectionProgressEvent::TcpConnectionSuccessful, + }; + + cpm_recipient.try_send(cpm).unwrap(); + + let assertions = Box::new(move |actor: &mut Neighborhood| { + assert_eq!(actor.overall_connection_status, initial_ocs); + }); + addr.try_send(AssertionsMessage { assertions }).unwrap(); + System::current().stop(); + assert_eq!(system.run(), 0); + } + #[test] pub fn neighborhood_handles_connection_progress_message_with_tcp_connection_established() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_connection_progress_message_with_tcp_connection_established", @@ -1667,8 +1713,12 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![beginning_connection_progress_clone] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![beginning_connection_progress_clone] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); @@ -1690,16 +1740,21 @@ mod tests { #[test] fn ask_about_debut_gossip_message_handles_timeout_in_case_no_response_is_received() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "ask_about_debut_gossip_message_handles_timeout_in_case_no_response_is_received", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let (node_to_ui_recipient, node_to_ui_recording_arc) = + make_recipient_and_recording_arc(Some(TypeId::of::())); + subject.node_to_ui_recipient_opt = Some(node_to_ui_recipient); + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let beginning_connection_progress = ConnectionProgress { initial_node_descriptor: node_descriptor.clone(), @@ -1717,28 +1772,64 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor, - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::Failed(NoGossipResponseReceived), - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor, + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::Failed(NoGossipResponseReceived), + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); System::current().stop(); + let node_to_ui_mutex = node_to_ui_recording_arc.lock().unwrap(); + let node_to_ui_message_opt = node_to_ui_mutex.get_record_opt::(0); + assert_eq!(system.run(), 0); + assert_eq!(node_to_ui_message_opt, None); + } + + #[test] + pub fn it_doesn_t_cause_a_panic_if_neighborhood_receives_ask_about_debut_message_from_unknown_descriptor( + ) { + let (_known_ip, known_desc) = make_node(1); + let (unknown_ip, unknown_desc) = make_node(2); + let subject = make_subject_from_node_descriptor(&known_desc, "it_doesn_t_cause_a_panic_if_neighborhood_receives_ask_about_debut_message_from_unknown_descriptor"); + let initial_ocs = subject.overall_connection_status.clone(); + let addr = subject.start(); + let recipient: Recipient = addr.clone().recipient(); + let aadgrm = AskAboutDebutGossipMessage { + prev_connection_progress: ConnectionProgress { + initial_node_descriptor: unknown_desc.clone(), + current_peer_addr: unknown_ip, + connection_stage: ConnectionStage::TcpConnectionEstablished, + }, + }; + let system = System::new("testing"); + + recipient.try_send(aadgrm).unwrap(); + + let assertions = Box::new(move |actor: &mut Neighborhood| { + assert_eq!(actor.overall_connection_status, initial_ocs); + }); + addr.try_send(AssertionsMessage { assertions }).unwrap(); + System::current().stop(); assert_eq!(system.run(), 0); } #[test] pub fn neighborhood_handles_connection_progress_message_with_tcp_connection_failed() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); - let subject = make_subject_from_node_descriptor( + let (node_ip_addr, node_descriptor) = make_node(1); + let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_connection_progress_message_with_tcp_connection_failed", ); + let (node_to_ui_recipient, node_to_ui_recording_arc) = make_node_to_ui_recipient(); + subject.node_to_ui_recipient_opt = Some(node_to_ui_recipient); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); let system = System::new("testing"); @@ -1751,37 +1842,46 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::Failed(TcpConnectionFailed) - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::Failed(TcpConnectionFailed) + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); System::current().stop(); + let node_to_ui_mutex = node_to_ui_recording_arc.lock().unwrap(); + let node_to_ui_message_opt = node_to_ui_mutex.get_record_opt::(0); assert_eq!(system.run(), 0); + assert_eq!(node_to_ui_message_opt, None); } #[test] fn neighborhood_handles_a_connection_progress_message_with_pass_gossip_received() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_a_connection_progress_message_with_pass_gossip_received", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); let system = System::new("testing"); - let new_pass_target = IpAddr::from_str("10.20.30.40").unwrap(); + let new_pass_target = make_ip(2); let connection_progress_message = ConnectionProgressMessage { peer_addr: node_ip_addr, event: ConnectionProgressEvent::PassGossipReceived(new_pass_target), @@ -1791,12 +1891,16 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: new_pass_target, - connection_stage: ConnectionStage::StageZero - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: new_pass_target, + connection_stage: ConnectionStage::StageZero + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); @@ -1807,16 +1911,18 @@ mod tests { #[test] fn neighborhood_handles_a_connection_progress_message_with_pass_loop_found() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_a_connection_progress_message_with_pass_loop_found", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); @@ -1830,12 +1936,16 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::Failed(PassLoopFound) - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::Failed(PassLoopFound) + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); @@ -1846,56 +1956,81 @@ mod tests { #[test] fn neighborhood_handles_a_connection_progress_message_with_introduction_gossip_received() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_a_connection_progress_message_with_introduction_gossip_received", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let (node_to_ui_recipient, node_to_ui_recording_arc) = + make_recipient_and_recording_arc(Some(TypeId::of::())); + subject.node_to_ui_recipient_opt = Some(node_to_ui_recipient); + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); let system = System::new("testing"); - let new_node = IpAddr::from_str("10.20.30.40").unwrap(); let connection_progress_message = ConnectionProgressMessage { peer_addr: node_ip_addr, - event: ConnectionProgressEvent::IntroductionGossipReceived(new_node), + event: ConnectionProgressEvent::IntroductionGossipReceived(make_ip(2)), }; cpm_recipient.try_send(connection_progress_message).unwrap(); let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::NeighborshipEstablished - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::ConnectedToNeighbor, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::NeighborshipEstablished + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); - System::current().stop(); assert_eq!(system.run(), 0); + let node_to_ui_mutex = node_to_ui_recording_arc.lock().unwrap(); + let node_to_ui_message_opt = node_to_ui_mutex.get_record_opt::(0); + assert_eq!(node_to_ui_mutex.len(), 1); + assert_eq!( + node_to_ui_message_opt, + Some(&NodeToUiMessage { + target: MessageTarget::AllClients, + body: UiConnectionChangeBroadcast { + stage: UiConnectionChangeStage::ConnectedToNeighbor + } + .tmb(0) + }) + ); } #[test] fn neighborhood_handles_a_connection_progress_message_with_standard_gossip_received() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_a_connection_progress_message_with_standard_gossip_received", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let (node_to_ui_recipient, node_to_ui_recording_arc) = + make_recipient_and_recording_arc(Some(TypeId::of::())); + subject.node_to_ui_recipient_opt = Some(node_to_ui_recipient); + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); @@ -1909,32 +2044,50 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::NeighborshipEstablished - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::ConnectedToNeighbor, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::NeighborshipEstablished + }] + } ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); - System::current().stop(); assert_eq!(system.run(), 0); + let node_to_ui_mutex = node_to_ui_recording_arc.lock().unwrap(); + let node_to_ui_message_opt = node_to_ui_mutex.get_record_opt::(0); + assert_eq!(node_to_ui_mutex.len(), 1); + assert_eq!( + node_to_ui_message_opt, + Some(&NodeToUiMessage { + target: MessageTarget::AllClients, + body: UiConnectionChangeBroadcast { + stage: UiConnectionChangeStage::ConnectedToNeighbor + } + .tmb(0) + }) + ); } #[test] fn neighborhood_handles_a_connection_progress_message_with_no_gossip_response_received() { init_test_logging(); - let node_ip_addr = IpAddr::from_str("5.4.3.2").unwrap(); - let node_descriptor = make_node_descriptor(node_ip_addr); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = make_subject_from_node_descriptor( &node_descriptor, "neighborhood_handles_a_connection_progress_message_with_no_gossip_response_received", ); - subject.overall_connection_status.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = subject + .overall_connection_status + .get_connection_progress_by_ip(node_ip_addr) + .unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - subject.node_to_ui_recipient_opt.as_ref().unwrap(), ); let addr = subject.start(); let cpm_recipient = addr.clone().recipient(); @@ -1948,12 +2101,81 @@ mod tests { let assertions = Box::new(move |actor: &mut Neighborhood| { assert_eq!( - actor.overall_connection_status.progress, - vec![ConnectionProgress { - initial_node_descriptor: node_descriptor.clone(), - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::Failed(NoGossipResponseReceived) - }] + actor.overall_connection_status, + OverallConnectionStatus { + can_make_routes: false, + stage: OverallConnectionStage::NotConnected, + progress: vec![ConnectionProgress { + initial_node_descriptor: node_descriptor.clone(), + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::Failed(NoGossipResponseReceived) + }] + } + ); + }); + addr.try_send(AssertionsMessage { assertions }).unwrap(); + System::current().stop(); + assert_eq!(system.run(), 0); + } + + #[test] + pub fn progress_in_the_stage_of_overall_connection_status_made_by_one_cpm_is_not_overriden_by_the_other( + ) { + init_test_logging(); + let peer_1 = make_ip(1); + let peer_2 = make_ip(2); + let initial_node_descriptors = + vec![make_node_descriptor(peer_1), make_node_descriptor(peer_2)]; + let neighborhood_config = NeighborhoodConfig { + mode: NeighborhoodMode::Standard( + NodeAddr::new(&make_ip(3), &[1234]), + initial_node_descriptors, + rate_pack(100), + ), + }; + let mut subject = Neighborhood::new( + main_cryptde(), + &bc_from_nc_plus( + neighborhood_config, + make_wallet("earning"), + None, + "progress_in_the_stage_of_overall_connection_status_made_by_one_cpm_is_not_overriden_by_the_other"), + ); + let (node_to_ui_recipient, _) = make_node_to_ui_recipient(); + subject.node_to_ui_recipient_opt = Some(node_to_ui_recipient); + let addr = subject.start(); + let cpm_recipient = addr.clone().recipient(); + let system = System::new("testing"); + cpm_recipient + .try_send(ConnectionProgressMessage { + peer_addr: peer_1, + event: ConnectionProgressEvent::TcpConnectionSuccessful, + }) + .unwrap(); + cpm_recipient + .try_send(ConnectionProgressMessage { + peer_addr: peer_1, + event: ConnectionProgressEvent::IntroductionGossipReceived(make_ip(4)), + }) + .unwrap(); + cpm_recipient + .try_send(ConnectionProgressMessage { + peer_addr: peer_2, + event: ConnectionProgressEvent::TcpConnectionSuccessful, + }) + .unwrap(); + + cpm_recipient + .try_send(ConnectionProgressMessage { + peer_addr: peer_2, + event: ConnectionProgressEvent::PassGossipReceived(make_ip(5)), + }) + .unwrap(); + + let assertions = Box::new(move |actor: &mut Neighborhood| { + assert_eq!( + actor.overall_connection_status.stage(), + OverallConnectionStage::ConnectedToNeighbor ); }); addr.try_send(AssertionsMessage { assertions }).unwrap(); diff --git a/node/src/neighborhood/overall_connection_status.rs b/node/src/neighborhood/overall_connection_status.rs index 507eeb4b4..7fae3d666 100644 --- a/node/src/neighborhood/overall_connection_status.rs +++ b/node/src/neighborhood/overall_connection_status.rs @@ -8,6 +8,7 @@ use actix::Recipient; use masq_lib::messages::{ToMessageBody, UiConnectionChangeBroadcast, UiConnectionChangeStage}; use masq_lib::ui_gateway::{MessageTarget, NodeToUiMessage}; use std::net::IpAddr; +use std::string::String; #[derive(PartialEq, Debug, Clone)] pub enum ConnectionStageErrors { @@ -95,7 +96,7 @@ impl ConnectionProgress { } #[derive(PartialEq, Debug, Copy, Clone)] -enum OverallConnectionStage { +pub enum OverallConnectionStage { NotConnected = 0, ConnectedToNeighbor = 1, // When an Introduction or Standard Gossip (acceptance) is received ThreeHopsRouteFound = 2, // Data can be relayed once this stage is reached @@ -117,12 +118,12 @@ impl From for UiConnectionChangeStage { } } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub struct OverallConnectionStatus { // The check_connectedness() updates the boolean when three hops route is found - can_make_routes: bool, + pub can_make_routes: bool, // Transition depends on the ConnectionProgressMessage & check_connectedness(), they may not be in sync - stage: OverallConnectionStage, + pub stage: OverallConnectionStage, // Corresponds to the initial_node_descriptors, that are entered by the user using --neighbors pub progress: Vec, } @@ -147,51 +148,47 @@ impl OverallConnectionStatus { .map(|connection_progress| &connection_progress.initial_node_descriptor) } - pub fn get_connection_progress_by_ip(&mut self, peer_addr: IpAddr) -> &mut ConnectionProgress { - let connection_progress_to_modify = self + pub fn get_connection_progress_by_ip( + &mut self, + peer_addr: IpAddr, + ) -> Result<&mut ConnectionProgress, String> { + let connection_progress_res = self .progress .iter_mut() - .find(|connection_progress| connection_progress.current_peer_addr == peer_addr) - .unwrap_or_else(|| { - panic!( - "Unable to find the Node in connections with IP Address: {}", - peer_addr - ) - }); - - connection_progress_to_modify + .find(|connection_progress| connection_progress.current_peer_addr == peer_addr); + + match connection_progress_res { + Some(connection_progress) => Ok(connection_progress), + None => Err(format!( + "Unable to find the Node in connections with IP Address: {}", + peer_addr + )), + } } pub fn get_connection_progress_by_desc( - &self, + &mut self, initial_node_descriptor: &NodeDescriptor, - ) -> &ConnectionProgress { - let connection_progress = self - .progress - .iter() - .find(|connection_progress| { - &connection_progress.initial_node_descriptor == initial_node_descriptor - }) - .unwrap_or_else(|| { - panic!( - "Unable to find the Node in connections with Node Descriptor: {:?}", - initial_node_descriptor - ) - }); - - connection_progress + ) -> Result<&mut ConnectionProgress, String> { + let connection_progress = self.progress.iter_mut().find(|connection_progress| { + &connection_progress.initial_node_descriptor == initial_node_descriptor + }); + + match connection_progress { + Some(connection_progress) => Ok(connection_progress), + None => Err(format!( + "Unable to find the Node in connections with Node Descriptor: {:?}", + initial_node_descriptor + )), + } } pub fn update_connection_stage( - &mut self, - peer_addr: IpAddr, + connection_progress: &mut ConnectionProgress, event: ConnectionProgressEvent, - node_to_ui_recipient: &Recipient, ) { - let connection_progress_to_modify = self.get_connection_progress_by_ip(peer_addr); - let mut modify_connection_progress = - |stage: ConnectionStage| connection_progress_to_modify.update_stage(stage); + |stage: ConnectionStage| connection_progress.update_stage(stage); match event { ConnectionProgressEvent::TcpConnectionSuccessful => { @@ -202,14 +199,12 @@ impl OverallConnectionStatus { } ConnectionProgressEvent::IntroductionGossipReceived(_new_node) => { modify_connection_progress(ConnectionStage::NeighborshipEstablished); - self.update_stage_of_overall_connection_status(node_to_ui_recipient); } ConnectionProgressEvent::StandardGossipReceived => { modify_connection_progress(ConnectionStage::NeighborshipEstablished); - self.update_stage_of_overall_connection_status(node_to_ui_recipient); } ConnectionProgressEvent::PassGossipReceived(new_pass_target) => { - connection_progress_to_modify.handle_pass_gossip(new_pass_target); + connection_progress.handle_pass_gossip(new_pass_target); } ConnectionProgressEvent::PassLoopFound => { modify_connection_progress(ConnectionStage::Failed(PassLoopFound)); @@ -220,7 +215,7 @@ impl OverallConnectionStatus { } } - fn update_stage_of_overall_connection_status( + pub fn update_ocs_stage_and_send_message_to_ui( &mut self, node_to_ui_recipient: &Recipient, ) { @@ -269,6 +264,10 @@ impl OverallConnectionStatus { pub fn update_can_make_routes(&mut self, can_make_routes: bool) { self.can_make_routes = can_make_routes; } + + pub fn stage(&self) -> OverallConnectionStage { + self.stage + } } #[cfg(test)] @@ -279,7 +278,7 @@ mod tests { }; use crate::neighborhood::PublicKey; use crate::test_utils::neighborhood_test_utils::{ - make_ip, make_node_and_recipient, make_node_descriptor, make_node_to_ui_recipient, + make_ip, make_node, make_node_descriptor, make_node_to_ui_recipient, }; use actix::System; use masq_lib::blockchains::chains::Chain; @@ -382,7 +381,7 @@ mod tests { } #[test] - fn can_receive_mut_ref_of_connection_progress_from_peer_addr() { + fn can_recieve_a_result_of_connection_progress_from_peer_addr() { let peer_1_ip = make_ip(1); let peer_2_ip = make_ip(2); let desc_1 = make_node_descriptor(peer_1_ip); @@ -391,13 +390,28 @@ mod tests { let mut subject = OverallConnectionStatus::new(initial_node_descriptors); + let res_1 = subject.get_connection_progress_by_ip(peer_1_ip); + assert_eq!(res_1, Ok(&mut ConnectionProgress::new(desc_1))); + let res_2 = subject.get_connection_progress_by_ip(peer_2_ip); + assert_eq!(res_2, Ok(&mut ConnectionProgress::new(desc_2))); + } + + #[test] + fn receives_an_error_in_receiving_connection_progress_from_unknown_ip_address() { + let peer = make_ip(1); + let desc = make_node_descriptor(peer); + let initial_node_descriptors = vec![desc]; + let unknown_peer = make_ip(2); + + let mut subject = OverallConnectionStatus::new(initial_node_descriptors); + + let res = subject.get_connection_progress_by_ip(unknown_peer); assert_eq!( - subject.get_connection_progress_by_ip(peer_1_ip), - &mut ConnectionProgress::new(desc_1) - ); - assert_eq!( - subject.get_connection_progress_by_ip(peer_2_ip), - &mut ConnectionProgress::new(desc_2) + res, + Err(format!( + "Unable to find the Node in connections with IP Address: {}", + unknown_peer + )) ); } @@ -407,15 +421,32 @@ mod tests { let desc_2 = make_node_descriptor(make_ip(2)); let initial_node_descriptors = vec![desc_1.clone(), desc_2.clone()]; - let subject = OverallConnectionStatus::new(initial_node_descriptors); + let mut subject = OverallConnectionStatus::new(initial_node_descriptors); assert_eq!( subject.get_connection_progress_by_desc(&desc_1), - &ConnectionProgress::new(desc_1) + Ok(&mut ConnectionProgress::new(desc_1)) ); assert_eq!( subject.get_connection_progress_by_desc(&desc_2), - &ConnectionProgress::new(desc_2) + Ok(&mut ConnectionProgress::new(desc_2)) + ); + } + + #[test] + fn receives_an_error_in_receiving_connection_progress_from_unknown_initial_node_desc() { + let known_desc = make_node_descriptor(make_ip(1)); + let unknown_desc = make_node_descriptor(make_ip(2)); + let initial_node_descriptors = vec![known_desc]; + + let mut subject = OverallConnectionStatus::new(initial_node_descriptors); + + assert_eq!( + subject.get_connection_progress_by_desc(&unknown_desc), + Err(format!( + "Unable to find the Node in connections with Node Descriptor: {:?}", + unknown_desc + )) ); } @@ -447,13 +478,12 @@ mod tests { #[test] fn updates_the_connection_stage_to_tcp_connection_established() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(), ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); assert_eq!( @@ -472,13 +502,14 @@ mod tests { #[test] fn updates_the_connection_stage_to_failed_when_tcp_connection_fails() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); + let connection_progress_to_modify = + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionFailed, - &recipient, ); assert_eq!( @@ -496,80 +527,71 @@ mod tests { } #[test] - fn updates_the_connection_stage_to_neighborship_established() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + fn updates_the_connection_stage_to_neighborship_established_when_introduction_gossip_is_received( + ) { + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - subject.update_connection_stage( - node_ip_addr, + let connection_progress = subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress, ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress, ConnectionProgressEvent::IntroductionGossipReceived(make_ip(1)), - &recipient, ); assert_eq!( - subject, - OverallConnectionStatus { - can_make_routes: false, - stage: OverallConnectionStage::ConnectedToNeighbor, - progress: vec![ConnectionProgress { - initial_node_descriptor: node_descriptor, - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::NeighborshipEstablished - }], + connection_progress, + &mut ConnectionProgress { + initial_node_descriptor: node_descriptor, + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::NeighborshipEstablished } ) } #[test] fn updates_the_connection_stage_to_neighborship_established_when_standard_gossip_is_received() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - subject.update_connection_stage( - node_ip_addr, + let connection_progress = subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress, ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress, ConnectionProgressEvent::StandardGossipReceived, - &recipient, ); assert_eq!( - subject, - OverallConnectionStatus { - can_make_routes: false, - stage: OverallConnectionStage::ConnectedToNeighbor, - progress: vec![ConnectionProgress { - initial_node_descriptor: node_descriptor, - current_peer_addr: node_ip_addr, - connection_stage: ConnectionStage::NeighborshipEstablished - }], + connection_progress, + &mut ConnectionProgress { + initial_node_descriptor: node_descriptor, + current_peer_addr: node_ip_addr, + connection_stage: ConnectionStage::NeighborshipEstablished } ) } #[test] fn updates_the_connection_stage_to_stage_zero_when_pass_gossip_is_received() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); let pass_target = make_ip(1); - subject.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::PassGossipReceived(pass_target), - &recipient, ); assert_eq!( @@ -588,18 +610,18 @@ mod tests { #[test] fn updates_connection_stage_to_failed_when_pass_loop_is_found() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - subject.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::PassLoopFound, - &recipient, ); assert_eq!( @@ -618,18 +640,18 @@ mod tests { #[test] fn updates_connection_stage_to_failed_when_no_gossip_response_is_received() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - subject.update_connection_stage( - node_ip_addr, + let connection_progress_to_modify = + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, ); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::NoGossipResponseReceived, - &recipient, ); assert_eq!( @@ -646,20 +668,6 @@ mod tests { ) } - #[test] - #[should_panic(expected = "Unable to find the Node in connections with IP Address: 1.1.1.1")] - fn panics_at_updating_the_connection_stage_if_a_node_is_not_a_part_of_connections() { - let (_node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); - let mut subject = OverallConnectionStatus::new(vec![node_descriptor.clone()]); - let non_existing_node_s_ip_addr = make_ip(1); - - subject.update_connection_stage( - non_existing_node_s_ip_addr, - ConnectionProgressEvent::TcpConnectionSuccessful, - &recipient, - ); - } - #[test] fn connection_stage_can_be_converted_to_number() { assert_eq!(usize::try_from(&ConnectionStage::StageZero), Ok(0)); @@ -680,13 +688,14 @@ mod tests { #[test] #[should_panic(expected = "Can't update the stage from StageZero to NeighborshipEstablished")] fn can_t_establish_neighborhsip_without_having_a_tcp_connection() { - let (node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); + let (node_ip_addr, node_descriptor) = make_node(1); let mut subject = OverallConnectionStatus::new(vec![node_descriptor]); + let connection_progress_to_modify = + subject.get_connection_progress_by_ip(node_ip_addr).unwrap(); - subject.update_connection_stage( - node_ip_addr, + OverallConnectionStatus::update_connection_stage( + connection_progress_to_modify, ConnectionProgressEvent::IntroductionGossipReceived(make_ip(1)), - &recipient, ); } @@ -725,7 +734,7 @@ mod tests { #[test] fn we_can_ask_about_can_make_routes() { - let (_node_ip_addr, node_descriptor, _recipient) = make_node_and_recipient(); + let node_descriptor = make_node_descriptor(make_ip(1)); let subject = OverallConnectionStatus::new(vec![node_descriptor]); let can_make_routes = subject.can_make_routes(); @@ -735,7 +744,7 @@ mod tests { #[test] fn can_update_the_boolean_can_make_routes() { - let (_node_ip_addr, node_descriptor, _recipient) = make_node_and_recipient(); + let node_descriptor = make_node_descriptor(make_ip(1)); let mut subject = OverallConnectionStatus::new(vec![node_descriptor]); let can_make_routes_initially = subject.can_make_routes(); @@ -747,40 +756,16 @@ mod tests { } #[test] - fn updates_from_not_connected_to_connected_to_neighbor_in_case_flag_is_false() { - let (_node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); - let mut subject = OverallConnectionStatus::new(vec![node_descriptor]); - subject.update_can_make_routes(false); - - subject.update_stage_of_overall_connection_status(&recipient); - - assert_eq!(subject.stage, OverallConnectionStage::ConnectedToNeighbor); - } - - #[test] - fn updates_from_not_connected_to_three_hops_route_found_in_case_flag_is_true() { - let (_node_ip_addr, node_descriptor, recipient) = make_node_and_recipient(); - let mut subject = OverallConnectionStatus::new(vec![node_descriptor]); - subject.update_can_make_routes(true); - - subject.update_stage_of_overall_connection_status(&recipient); - - assert_eq!(subject.stage, OverallConnectionStage::ThreeHopsRouteFound); - } - - #[test] - fn updates_the_stage_to_three_hops_route_found_in_case_introduction_gossip_is_received_and_flag_is_true( + fn updates_the_stage_to_three_hops_route_found_in_case_introduction_or_standard_gossip_is_received_and_flag_is_true( ) { let initial_stage = OverallConnectionStage::NotConnected; - let event = ConnectionProgressEvent::IntroductionGossipReceived(make_ip(1)); let can_make_routes = true; let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( + assert_stage_and_node_to_ui_message( initial_stage, - event, can_make_routes, - "updates_the_stage_to_three_hops_route_found_in_case_introduction_gossip_is_received_and_flag_is_true" + "updates_the_stage_to_three_hops_route_found_in_case_introduction_or_standard_gossip_is_received_and_flag_is_true" ); assert_eq!(stage, OverallConnectionStage::ThreeHopsRouteFound); @@ -797,18 +782,16 @@ mod tests { } #[test] - fn updates_the_stage_to_connected_to_neighbor_in_case_introduction_gossip_is_received_and_flag_is_false( + fn updates_the_stage_to_connected_to_neighbor_in_case_introduction_or_standard_gossip_is_received_and_flag_is_false( ) { let initial_stage = OverallConnectionStage::NotConnected; - let event = ConnectionProgressEvent::IntroductionGossipReceived(make_ip(1)); let can_make_routes = false; let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( + assert_stage_and_node_to_ui_message( initial_stage, - event, can_make_routes, - "updates_the_stage_to_connected_to_neighbor_in_case_introduction_gossip_is_received_and_flag_is_false" + "updates_the_stage_to_connected_to_neighbor_in_case_introduction_or_standard_gossip_is_received_and_flag_is_false" ); assert_eq!(stage, OverallConnectionStage::ConnectedToNeighbor); @@ -825,74 +808,16 @@ mod tests { } #[test] - fn updates_the_stage_to_three_hops_route_found_in_case_standard_gossip_is_received_and_flag_is_true( - ) { - let initial_stage = OverallConnectionStage::NotConnected; - let event = ConnectionProgressEvent::StandardGossipReceived; - let can_make_routes = true; - - let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( - initial_stage, - event, - can_make_routes, - "updates_the_stage_to_three_hops_route_found_in_case_standard_gossip_is_received_and_flag_is_true" - ); - - assert_eq!(stage, OverallConnectionStage::ThreeHopsRouteFound); - assert_eq!( - message_opt, - Some(NodeToUiMessage { - target: MessageTarget::AllClients, - body: UiConnectionChangeBroadcast { - stage: UiConnectionChangeStage::ThreeHopsRouteFound - } - .tmb(0) - }) - ); - } - - #[test] - fn updates_the_stage_to_connected_to_neighbor_in_case_standard_gossip_is_received_and_flag_is_false( + fn doesn_t_send_message_to_the_ui_in_case_introduction_or_standard_gossip_is_received_but_stage_hasn_t_updated( ) { - let initial_stage = OverallConnectionStage::NotConnected; - let event = ConnectionProgressEvent::StandardGossipReceived; - let can_make_routes = false; - - let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( - initial_stage, - event, - can_make_routes, - "updates_the_stage_to_connected_to_neighbor_in_case_standard_gossip_is_received_and_flag_is_false" - ); - - assert_eq!(stage, OverallConnectionStage::ConnectedToNeighbor); - assert_eq!( - message_opt, - Some(NodeToUiMessage { - target: MessageTarget::AllClients, - body: UiConnectionChangeBroadcast { - stage: UiConnectionChangeStage::ConnectedToNeighbor - } - .tmb(0) - }) - ); - } - - #[test] - fn doesn_t_send_message_to_the_ui_in_case_gossip_is_received_but_stage_hasn_t_updated() { let initial_stage = OverallConnectionStage::ConnectedToNeighbor; - let event = ConnectionProgressEvent::StandardGossipReceived; let can_make_routes = false; - let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( - initial_stage, - event, - can_make_routes, - "doesn_t_send_message_to_the_ui_in_case_gossip_is_received_but_stage_hasn_t_updated" - ); + let (stage, message_opt) = assert_stage_and_node_to_ui_message( + initial_stage, + can_make_routes, + "doesn_t_send_message_to_the_ui_in_case_introduction_or_standard_gossip_is_received_but_stage_hasn_t_updated", + ); assert_eq!(stage, initial_stage); assert_eq!(message_opt, None); @@ -902,13 +827,11 @@ mod tests { fn doesn_t_send_a_message_to_ui_in_case_connection_drops_from_three_hops_to_connected_to_neighbor( ) { let initial_stage = OverallConnectionStage::ThreeHopsRouteFound; - let event = ConnectionProgressEvent::StandardGossipReceived; let can_make_routes = false; let (stage, message_opt) = - stage_and_ui_message_by_connection_progress_event_and_can_make_routes( + assert_stage_and_node_to_ui_message( initial_stage, - event, can_make_routes, "doesn_t_send_a_message_to_ui_in_case_connection_drops_from_three_hops_to_connected_to_neighbor" ); @@ -918,68 +841,24 @@ mod tests { } #[test] - fn progress_done_by_one_connection_progress_can_not_be_overridden_by_other_in_overall_connection_progress( - ) { - let ip_addr_1 = make_ip(1); - let ip_addr_2 = make_ip(2); - let mut subject = OverallConnectionStatus::new(vec![ - make_node_descriptor(ip_addr_1), - make_node_descriptor(ip_addr_2), - ]); - let (node_to_ui_recipient, _) = make_node_to_ui_recipient(); - subject.update_connection_stage( - ip_addr_1, - ConnectionProgressEvent::TcpConnectionSuccessful, - &node_to_ui_recipient, - ); - subject.update_connection_stage( - ip_addr_1, - ConnectionProgressEvent::IntroductionGossipReceived(make_ip(3)), - &node_to_ui_recipient, - ); - subject.update_connection_stage( - ip_addr_2, - ConnectionProgressEvent::TcpConnectionSuccessful, - &node_to_ui_recipient, - ); - - subject.update_connection_stage( - ip_addr_2, - ConnectionProgressEvent::PassGossipReceived(make_ip(4)), - &node_to_ui_recipient, - ); - - assert_eq!(subject.stage, OverallConnectionStage::ConnectedToNeighbor); + fn getter_fn_for_the_stage_of_overall_connection_status_exists() { + let subject = OverallConnectionStatus::new(vec![make_node_descriptor(make_ip(1))]); + assert_eq!(subject.stage(), OverallConnectionStage::NotConnected); } - fn stage_and_ui_message_by_connection_progress_event_and_can_make_routes( + fn assert_stage_and_node_to_ui_message( initial_stage: OverallConnectionStage, - event: ConnectionProgressEvent, can_make_routes: bool, test_name: &str, ) -> (OverallConnectionStage, Option) { - let (peer_addr, node_descriptor, _) = make_node_and_recipient(); - let mut subject = OverallConnectionStatus::new(vec![node_descriptor]); + let mut subject = + OverallConnectionStatus::new(vec![make_node_descriptor(make_ip(u8::MAX))]); let (node_to_ui_recipient, node_to_ui_recording_arc) = make_node_to_ui_recipient(); subject.stage = initial_stage; - subject.update_connection_stage( - peer_addr, - ConnectionProgressEvent::TcpConnectionSuccessful, - &node_to_ui_recipient, - ); let system = System::new(test_name); subject.update_can_make_routes(can_make_routes); - match event { - ConnectionProgressEvent::StandardGossipReceived - | ConnectionProgressEvent::IntroductionGossipReceived(_) => { - subject.update_connection_stage(peer_addr, event, &node_to_ui_recipient); - } - _ => panic!( - "Can't update to event {:?} because it doesn't leads to Neighborship Established", - event - ), - } + subject.update_ocs_stage_and_send_message_to_ui(&node_to_ui_recipient); System::current().stop(); assert_eq!(system.run(), 0); diff --git a/node/src/test_utils/neighborhood_test_utils.rs b/node/src/test_utils/neighborhood_test_utils.rs index 46a8a35ae..f859cb6da 100644 --- a/node/src/test_utils/neighborhood_test_utils.rs +++ b/node/src/test_utils/neighborhood_test_utils.rs @@ -1,3 +1,4 @@ +use std::any::TypeId; // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. use crate::bootstrapper::BootstrapperConfig; use crate::neighborhood::gossip::GossipNodeRecord; @@ -292,21 +293,26 @@ pub fn make_node_descriptor(ip_addr: IpAddr) -> NodeDescriptor { } } -pub fn make_node_and_recipient() -> (IpAddr, NodeDescriptor, Recipient) { - let ip_addr = make_ip(u8::MAX); +pub fn make_node(nonce: u8) -> (IpAddr, NodeDescriptor) { + let ip_addr = make_ip(nonce); let node_descriptor = make_node_descriptor(ip_addr); - let (node_to_ui_recipient, _) = make_node_to_ui_recipient(); - (ip_addr, node_descriptor, node_to_ui_recipient) + (ip_addr, node_descriptor) } -pub fn make_recipient_and_recording_arc() -> (Recipient, Arc>) +pub fn make_recipient_and_recording_arc( + stopping_message: Option, +) -> (Recipient, Arc>) where M: Message + Send, ::Result: Send, Recorder: Handler, { let (recorder, _, recording_arc) = make_recorder(); + let recorder = match stopping_message { + Some(type_id) => recorder.stop_condition(type_id), // No need to write stop message after this + None => recorder, + }; let addr = recorder.start(); let recipient = addr.recipient::(); @@ -314,9 +320,9 @@ where } pub fn make_cpm_recipient() -> (Recipient, Arc>) { - make_recipient_and_recording_arc() + make_recipient_and_recording_arc(None) } pub fn make_node_to_ui_recipient() -> (Recipient, Arc>) { - make_recipient_and_recording_arc() + make_recipient_and_recording_arc(None) } diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index 063eb3360..c30f1ed13 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -45,13 +45,17 @@ use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::ui_gateway::UiGatewaySubs; use crate::test_utils::to_millis; +use crate::test_utils::unshared_test_utils::SystemKillerActor; use actix::Addr; use actix::Context; use actix::Handler; use actix::MessageResult; +use actix::System; use actix::{Actor, Message}; use masq_lib::ui_gateway::{NodeFromUiMessage, NodeToUiMessage}; use std::any::Any; +use std::any::TypeId; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -62,6 +66,7 @@ pub struct Recorder { recording: Arc>, node_query_responses: Vec>, route_query_responses: Vec>, + expected_count_by_msg_type_opt: Option>, } #[derive(Default)] @@ -84,6 +89,20 @@ macro_rules! recorder_message_handler { fn handle(&mut self, msg: $message_type, _ctx: &mut Self::Context) { self.record(msg); + if let Some(expected_count_by_msg_type) = &mut self.expected_count_by_msg_type_opt { + let type_id = TypeId::of::<$message_type>(); + let count = expected_count_by_msg_type.entry(type_id).or_insert(0); + if *count == 0 { + panic!( + "Received a message, which we were not supposed to receive. {:?}", + stringify!($message_type) + ); + }; + *count -= 1; + if !expected_count_by_msg_type.values().any(|&x| x > 0) { + System::current().stop(); + } + } } } }; @@ -217,6 +236,22 @@ impl Recorder { self.route_query_responses.push(response); self } + + pub fn stop_condition(self, message_type_id: TypeId) -> Recorder { + let mut expected_count_by_messages: HashMap = HashMap::new(); + expected_count_by_messages.insert(message_type_id, 1); + self.stop_after_messages_and_start_system_killer(expected_count_by_messages) + } + + pub fn stop_after_messages_and_start_system_killer( + mut self, + expected_count_by_messages: HashMap, + ) -> Recorder { + let system_killer = SystemKillerActor::new(Duration::from_secs(60)); + system_killer.start(); + self.expected_count_by_msg_type_opt = Some(expected_count_by_messages); + self + } } impl Recording {