@@ -634,11 +634,12 @@ impl Builder {
634634 }
635635 } ;
636636
637- let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
637+ let ( stop_sender , stop_receiver ) = tokio :: sync :: watch :: channel ( ( ) ) ;
638638
639639 Arc :: new ( Node {
640640 runtime,
641- stop_running,
641+ stop_sender,
642+ stop_receiver,
642643 config,
643644 wallet,
644645 tx_sync,
@@ -663,7 +664,8 @@ impl Builder {
663664/// Needs to be initialized and instantiated through [`Builder::build`].
664665pub struct Node {
665666 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
666- stop_running : Arc < AtomicBool > ,
667+ stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
668+ stop_receiver : tokio:: sync:: watch:: Receiver < ( ) > ,
667669 config : Arc < Config > ,
668670 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
669671 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -697,8 +699,6 @@ impl Node {
697699
698700 let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
699701
700- let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
701-
702702 let event_handler = Arc :: new ( EventHandler :: new (
703703 Arc :: clone ( & self . wallet ) ,
704704 Arc :: clone ( & self . event_queue ) ,
@@ -717,31 +717,36 @@ impl Node {
717717 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
718718 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
719719 let sync_logger = Arc :: clone ( & self . logger ) ;
720- let stop_sync = Arc :: clone ( & stop_running ) ;
720+ let mut stop_sync = self . stop_receiver . clone ( ) ;
721721
722722 std:: thread:: spawn ( move || {
723723 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
724724 async move {
725+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 30 ) ) ;
726+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
725727 loop {
726- if stop_sync. load ( Ordering :: Acquire ) {
727- return ;
728- }
729728 let now = Instant :: now ( ) ;
730- match wallet. sync ( ) . await {
731- Ok ( ( ) ) => log_info ! (
732- sync_logger,
733- "Background sync of on-chain wallet finished in {}ms." ,
734- now. elapsed( ) . as_millis( )
735- ) ,
736- Err ( err) => {
737- log_error ! (
738- sync_logger,
739- "Background sync of on-chain wallet failed: {}" ,
740- err
741- )
729+ tokio:: select! {
730+ _ = stop_sync. changed( ) => {
731+ return ;
732+ }
733+ _ = interval. tick( ) => {
734+ match wallet. sync( ) . await {
735+ Ok ( ( ) ) => log_info!(
736+ sync_logger,
737+ "Background sync of on-chain wallet finished in {}ms." ,
738+ now. elapsed( ) . as_millis( )
739+ ) ,
740+ Err ( err) => {
741+ log_error!(
742+ sync_logger,
743+ "Background sync of on-chain wallet failed: {}" ,
744+ err
745+ )
746+ }
747+ }
742748 }
743749 }
744- tokio:: time:: sleep ( Duration :: from_secs ( 20 ) ) . await ;
745750 }
746751 } ,
747752 ) ;
@@ -788,35 +793,40 @@ impl Node {
788793 }
789794
790795 let sync_logger = Arc :: clone ( & self . logger ) ;
791- let stop_sync = Arc :: clone ( & stop_running ) ;
796+ let mut stop_sync = self . stop_receiver . clone ( ) ;
792797 runtime. spawn ( async move {
798+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 10 ) ) ;
799+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
793800 loop {
794- if stop_sync. load ( Ordering :: Acquire ) {
795- return ;
796- }
797801 let now = Instant :: now ( ) ;
798- let confirmables = vec ! [
799- & * sync_cman as & ( dyn Confirm + Sync + Send ) ,
800- & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
801- ] ;
802- match tx_sync. sync ( confirmables) . await {
803- Ok ( ( ) ) => log_info ! (
804- sync_logger,
805- "Background sync of Lightning wallet finished in {}ms." ,
806- now. elapsed( ) . as_millis( )
807- ) ,
808- Err ( e) => {
809- log_error ! ( sync_logger, "Background sync of Lightning wallet failed: {}" , e)
802+ tokio:: select! {
803+ _ = stop_sync. changed( ) => {
804+ return ;
805+ }
806+ _ = interval. tick( ) => {
807+ let confirmables = vec![
808+ & * sync_cman as & ( dyn Confirm + Sync + Send ) ,
809+ & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
810+ ] ;
811+ match tx_sync. sync( confirmables) . await {
812+ Ok ( ( ) ) => log_info!(
813+ sync_logger,
814+ "Background sync of Lightning wallet finished in {}ms." ,
815+ now. elapsed( ) . as_millis( )
816+ ) ,
817+ Err ( e) => {
818+ log_error!( sync_logger, "Background sync of Lightning wallet failed: {}" , e)
819+ }
820+ }
810821 }
811822 }
812- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
813823 }
814824 } ) ;
815825
816826 if let Some ( listening_address) = & self . config . listening_address {
817827 // Setup networking
818828 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
819- let stop_listen = Arc :: clone ( & stop_running ) ;
829+ let mut stop_listen = self . stop_receiver . clone ( ) ;
820830 let listening_address = listening_address. clone ( ) ;
821831
822832 let bind_addr = listening_address
@@ -831,18 +841,22 @@ impl Node {
831841 "Failed to bind to listen address/port - is something else already listening on it?" ,
832842 ) ;
833843 loop {
834- if stop_listen. load ( Ordering :: Acquire ) {
835- return ;
836- }
837844 let peer_mgr = Arc :: clone ( & peer_manager_connection_handler) ;
838- let tcp_stream = listener. accept ( ) . await . unwrap ( ) . 0 ;
839- tokio:: spawn ( async move {
840- lightning_net_tokio:: setup_inbound (
841- Arc :: clone ( & peer_mgr) ,
842- tcp_stream. into_std ( ) . unwrap ( ) ,
843- )
844- . await ;
845- } ) ;
845+ tokio:: select! {
846+ _ = stop_listen. changed( ) => {
847+ return ;
848+ }
849+ res = listener. accept( ) => {
850+ let tcp_stream = res. unwrap( ) . 0 ;
851+ tokio:: spawn( async move {
852+ lightning_net_tokio:: setup_inbound(
853+ Arc :: clone( & peer_mgr) ,
854+ tcp_stream. into_std( ) . unwrap( ) ,
855+ )
856+ . await ;
857+ } ) ;
858+ }
859+ }
846860 }
847861 } ) ;
848862 }
@@ -852,36 +866,38 @@ impl Node {
852866 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
853867 let connect_logger = Arc :: clone ( & self . logger ) ;
854868 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
855- let stop_connect = Arc :: clone ( & stop_running ) ;
869+ let mut stop_connect = self . stop_receiver . clone ( ) ;
856870 runtime. spawn ( async move {
857871 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
858872 interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
859873 loop {
860- if stop_connect. load ( Ordering :: Acquire ) {
861- return ;
862- }
863-
864- interval. tick ( ) . await ;
865- let pm_peers = connect_pm
866- . get_peer_node_ids ( )
867- . iter ( )
868- . map ( |( peer, _addr) | * peer)
869- . collect :: < Vec < _ > > ( ) ;
870- for node_id in connect_cm
871- . list_channels ( )
872- . iter ( )
873- . map ( |chan| chan. counterparty . node_id )
874- . filter ( |id| !pm_peers. contains ( id) )
875- {
876- if let Some ( peer_info) = connect_peer_store. get_peer ( & node_id) {
877- let _ = do_connect_peer (
878- peer_info. node_id ,
879- peer_info. address ,
880- Arc :: clone ( & connect_pm) ,
881- Arc :: clone ( & connect_logger) ,
882- )
883- . await ;
884- }
874+ tokio:: select! {
875+ _ = stop_connect. changed( ) => {
876+ return ;
877+ }
878+ _ = interval. tick( ) => {
879+ let pm_peers = connect_pm
880+ . get_peer_node_ids( )
881+ . iter( )
882+ . map( |( peer, _addr) | * peer)
883+ . collect:: <Vec <_>>( ) ;
884+ for node_id in connect_cm
885+ . list_channels( )
886+ . iter( )
887+ . map( |chan| chan. counterparty. node_id)
888+ . filter( |id| !pm_peers. contains( id) )
889+ {
890+ if let Some ( peer_info) = connect_peer_store. get_peer( & node_id) {
891+ let _ = do_connect_peer(
892+ peer_info. node_id,
893+ peer_info. address,
894+ Arc :: clone( & connect_pm) ,
895+ Arc :: clone( & connect_logger) ,
896+ )
897+ . await ;
898+ }
899+ }
900+ }
885901 }
886902 }
887903 } ) ;
@@ -890,28 +906,32 @@ impl Node {
890906 let bcast_cm = Arc :: clone ( & self . channel_manager ) ;
891907 let bcast_pm = Arc :: clone ( & self . peer_manager ) ;
892908 let bcast_config = Arc :: clone ( & self . config ) ;
893- let stop_bcast = Arc :: clone ( & stop_running ) ;
909+ let mut stop_bcast = self . stop_receiver . clone ( ) ;
894910 runtime. spawn ( async move {
895911 let mut interval = tokio:: time:: interval ( NODE_ANN_BCAST_INTERVAL ) ;
896912 loop {
897- if stop_bcast. load ( Ordering :: Acquire ) {
898- return ;
899- }
900-
901- if !bcast_cm. list_channels ( ) . iter ( ) . any ( |chan| chan. is_public ) { continue ; }
902-
903- interval. tick ( ) . await ;
904-
905- if !bcast_cm. list_channels ( ) . iter ( ) . any ( |chan| chan. is_public ) { continue ; }
913+ tokio:: select! {
914+ _ = stop_bcast. changed( ) => {
915+ return ;
916+ }
917+ _ = interval. tick( ) , if bcast_cm. list_channels( ) . iter( ) . any( |chan| chan. is_public) => {
918+ while bcast_pm. get_peer_node_ids( ) . is_empty( ) {
919+ // Sleep a bit and retry if we don't have any peers yet.
920+ tokio:: time:: sleep( Duration :: from_secs( 5 ) ) . await ;
921+
922+ // Check back if we need to stop.
923+ match stop_bcast. has_changed( ) {
924+ Ok ( false ) => { } ,
925+ Ok ( true ) => return ,
926+ Err ( _) => return ,
927+ }
928+ }
906929
907- while bcast_pm. get_peer_node_ids ( ) . is_empty ( ) {
908- // Sleep a bit and retry if we don't have any peers yet.
909- tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
930+ let addresses =
931+ bcast_config. listening_address. iter( ) . cloned( ) . map( |a| a. 0 ) . collect( ) ;
932+ bcast_pm. broadcast_node_announcement( [ 0 ; 3 ] , [ 0 ; 32 ] , addresses) ;
933+ }
910934 }
911-
912- let addresses =
913- bcast_config. listening_address . iter ( ) . cloned ( ) . map ( |a| a. 0 ) . collect ( ) ;
914- bcast_pm. broadcast_node_announcement ( [ 0 ; 3 ] , [ 0 ; 32 ] , addresses) ;
915935 }
916936 } ) ;
917937
@@ -924,15 +944,17 @@ impl Node {
924944 let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
925945 let background_logger = Arc :: clone ( & self . logger ) ;
926946 let background_scorer = Arc :: clone ( & self . scorer ) ;
927- let stop_background_processing = Arc :: clone ( & stop_running ) ;
947+ let stop_bp = self . stop_receiver . clone ( ) ;
928948 let sleeper = move |d| {
929- let stop = Arc :: clone ( & stop_background_processing ) ;
949+ let mut stop = stop_bp . clone ( ) ;
930950 Box :: pin ( async move {
931- if stop. load ( Ordering :: Acquire ) {
932- true
933- } else {
934- tokio:: time:: sleep ( d) . await ;
935- false
951+ tokio:: select! {
952+ _ = stop. changed( ) => {
953+ true
954+ }
955+ _ = tokio:: time:: sleep( d) => {
956+ false
957+ }
936958 }
937959 } )
938960 } ;
@@ -964,7 +986,7 @@ impl Node {
964986 pub fn stop ( & self ) -> Result < ( ) , Error > {
965987 let runtime = self . runtime . write ( ) . unwrap ( ) . take ( ) . ok_or ( Error :: NotRunning ) ?;
966988 // Stop the runtime.
967- self . stop_running . store ( true , Ordering :: Release ) ;
989+ self . stop_sender . send ( ( ) ) . expect ( "Failed to send stop signal" ) ;
968990
969991 // Stop disconnect peers.
970992 self . peer_manager . disconnect_all_peers ( ) ;
0 commit comments