@@ -329,7 +329,13 @@ impl Builder {
329329 EsploraBlockchain :: from_client ( tx_sync. client ( ) . clone ( ) , BDK_CLIENT_STOP_GAP )
330330 . with_concurrency ( BDK_CLIENT_CONCURRENCY ) ;
331331
332- let wallet = Arc :: new ( Wallet :: new ( blockchain, bdk_wallet, Arc :: clone ( & logger) ) ) ;
332+ let runtime = Arc :: new ( RwLock :: new ( None ) ) ;
333+ let wallet = Arc :: new ( Wallet :: new (
334+ blockchain,
335+ bdk_wallet,
336+ Arc :: clone ( & runtime) ,
337+ Arc :: clone ( & logger) ,
338+ ) ) ;
333339
334340 let kv_store = Arc :: new ( FilesystemStore :: new ( ldk_data_dir. clone ( ) . into ( ) ) ) ;
335341
@@ -538,10 +544,11 @@ impl Builder {
538544 }
539545 } ;
540546
541- let running = RwLock :: new ( None ) ;
547+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
542548
543549 Node {
544- running,
550+ runtime,
551+ stop_running,
545552 config,
546553 wallet,
547554 tx_sync,
@@ -561,18 +568,12 @@ impl Builder {
561568 }
562569}
563570
564- /// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped
565- /// upon [`Node::stop()`].
566- struct Runtime {
567- tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
568- stop_runtime : Arc < AtomicBool > ,
569- }
570-
571571/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
572572///
573573/// Needs to be initialized and instantiated through [`Builder::build`].
574574pub struct Node {
575- running : RwLock < Option < Runtime > > ,
575+ runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
576+ stop_running : Arc < AtomicBool > ,
576577 config : Arc < Config > ,
577578 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
578579 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -598,49 +599,15 @@ impl Node {
598599 /// a thread-safe manner.
599600 pub fn start ( & self ) -> Result < ( ) , Error > {
600601 // Acquire a run lock and hold it until we're setup.
601- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
602- if run_lock . is_some ( ) {
602+ let mut runtime_lock = self . runtime . write ( ) . unwrap ( ) ;
603+ if runtime_lock . is_some ( ) {
603604 // We're already running.
604605 return Err ( Error :: AlreadyRunning ) ;
605606 }
606607
607- let runtime = self . setup_runtime ( ) ?;
608- * run_lock = Some ( runtime) ;
609- Ok ( ( ) )
610- }
611-
612- /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
613- ///
614- /// After this returns most API methods will return [`Error::NotRunning`].
615- pub fn stop ( & self ) -> Result < ( ) , Error > {
616- let mut run_lock = self . running . write ( ) . unwrap ( ) ;
617- if run_lock. is_none ( ) {
618- return Err ( Error :: NotRunning ) ;
619- }
620-
621- let runtime = run_lock. as_ref ( ) . unwrap ( ) ;
622-
623- // Stop the runtime.
624- runtime. stop_runtime . store ( true , Ordering :: Release ) ;
625-
626- // Stop disconnect peers.
627- self . peer_manager . disconnect_all_peers ( ) ;
628-
629- // Drop the held runtimes.
630- self . wallet . drop_runtime ( ) ;
608+ let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ;
631609
632- // Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
633- * run_lock = None ;
634- Ok ( ( ) )
635- }
636-
637- fn setup_runtime ( & self ) -> Result < Runtime , Error > {
638- let tokio_runtime =
639- Arc :: new ( tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) ) ;
640-
641- self . wallet . set_runtime ( Arc :: clone ( & tokio_runtime) ) ;
642-
643- let stop_runtime = Arc :: new ( AtomicBool :: new ( false ) ) ;
610+ let stop_running = Arc :: new ( AtomicBool :: new ( false ) ) ;
644611
645612 let event_handler = Arc :: new ( EventHandler :: new (
646613 Arc :: clone ( & self . wallet ) ,
@@ -649,7 +616,7 @@ impl Node {
649616 Arc :: clone ( & self . network_graph ) ,
650617 Arc :: clone ( & self . keys_manager ) ,
651618 Arc :: clone ( & self . payment_store ) ,
652- Arc :: clone ( & tokio_runtime ) ,
619+ Arc :: clone ( & self . runtime ) ,
653620 Arc :: clone ( & self . logger ) ,
654621 Arc :: clone ( & self . config ) ,
655622 ) ) ;
@@ -660,7 +627,7 @@ impl Node {
660627 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
661628 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
662629 let sync_logger = Arc :: clone ( & self . logger ) ;
663- let stop_sync = Arc :: clone ( & stop_runtime ) ;
630+ let stop_sync = Arc :: clone ( & stop_running ) ;
664631
665632 std:: thread:: spawn ( move || {
666633 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
@@ -691,8 +658,8 @@ impl Node {
691658 } ) ;
692659
693660 let sync_logger = Arc :: clone ( & self . logger ) ;
694- let stop_sync = Arc :: clone ( & stop_runtime ) ;
695- tokio_runtime . spawn ( async move {
661+ let stop_sync = Arc :: clone ( & stop_running ) ;
662+ runtime . spawn ( async move {
696663 loop {
697664 if stop_sync. load ( Ordering :: Acquire ) {
698665 return ;
@@ -719,10 +686,10 @@ impl Node {
719686 if let Some ( listening_address) = & self . config . listening_address {
720687 // Setup networking
721688 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
722- let stop_listen = Arc :: clone ( & stop_runtime ) ;
689+ let stop_listen = Arc :: clone ( & stop_running ) ;
723690 let listening_address = listening_address. clone ( ) ;
724691
725- tokio_runtime . spawn ( async move {
692+ runtime . spawn ( async move {
726693 let listener =
727694 tokio:: net:: TcpListener :: bind ( listening_address) . await . expect (
728695 "Failed to bind to listen address/port - is something else already listening on it?" ,
@@ -749,8 +716,8 @@ impl Node {
749716 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
750717 let connect_logger = Arc :: clone ( & self . logger ) ;
751718 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
752- let stop_connect = Arc :: clone ( & stop_runtime ) ;
753- tokio_runtime . spawn ( async move {
719+ let stop_connect = Arc :: clone ( & stop_running ) ;
720+ runtime . spawn ( async move {
754721 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
755722 loop {
756723 if stop_connect. load ( Ordering :: Acquire ) {
@@ -790,7 +757,7 @@ impl Node {
790757 let background_peer_man = Arc :: clone ( & self . peer_manager ) ;
791758 let background_logger = Arc :: clone ( & self . logger ) ;
792759 let background_scorer = Arc :: clone ( & self . scorer ) ;
793- let stop_background_processing = Arc :: clone ( & stop_runtime ) ;
760+ let stop_background_processing = Arc :: clone ( & stop_running ) ;
794761 let sleeper = move |d| {
795762 let stop = Arc :: clone ( & stop_background_processing) ;
796763 Box :: pin ( async move {
@@ -803,7 +770,7 @@ impl Node {
803770 } )
804771 } ;
805772
806- tokio_runtime . spawn ( async move {
773+ runtime . spawn ( async move {
807774 process_events_async (
808775 background_persister,
809776 |e| background_event_handler. handle_event ( e) ,
@@ -820,7 +787,23 @@ impl Node {
820787 . expect ( "Failed to process events" ) ;
821788 } ) ;
822789
823- Ok ( Runtime { tokio_runtime, stop_runtime } )
790+ * runtime_lock = Some ( runtime) ;
791+ Ok ( ( ) )
792+ }
793+
794+ /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
795+ ///
796+ /// After this returns most API methods will return [`Error::NotRunning`].
797+ pub fn stop ( & self ) -> Result < ( ) , Error > {
798+ let runtime = self . runtime . write ( ) . unwrap ( ) . take ( ) . ok_or ( Error :: NotRunning ) ?;
799+ // Stop the runtime.
800+ self . stop_running . store ( true , Ordering :: Release ) ;
801+
802+ // Stop disconnect peers.
803+ self . peer_manager . disconnect_all_peers ( ) ;
804+
805+ runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
806+ Ok ( ( ) )
824807 }
825808
826809 /// Blocks until the next event is available.
@@ -870,12 +853,11 @@ impl Node {
870853 pub fn connect_open_channel (
871854 & self , node_pubkey_and_address : & str , channel_amount_sats : u64 , announce_channel : bool ,
872855 ) -> Result < ( ) , Error > {
873- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
874- if runtime_lock . is_none ( ) {
856+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
857+ if rt_lock . is_none ( ) {
875858 return Err ( Error :: NotRunning ) ;
876859 }
877-
878- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
860+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
879861
880862 let cur_balance = self . wallet . get_balance ( ) ?;
881863 if cur_balance. get_spendable ( ) < channel_amount_sats {
@@ -893,7 +875,7 @@ impl Node {
893875 let con_pm = Arc :: clone ( & self . peer_manager ) ;
894876
895877 tokio:: task:: block_in_place ( move || {
896- runtime. tokio_runtime . block_on ( async move {
878+ runtime. block_on ( async move {
897879 let res =
898880 connect_peer_if_necessary ( con_peer_pubkey, con_peer_addr, con_pm, con_logger)
899881 . await ;
@@ -947,10 +929,12 @@ impl Node {
947929 ///
948930 /// Note that the wallets will be also synced regularly in the background.
949931 pub fn sync_wallets ( & self ) -> Result < ( ) , Error > {
950- let runtime_lock = self . running . read ( ) . unwrap ( ) ;
951- if runtime_lock . is_none ( ) {
932+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
933+ if rt_lock . is_none ( ) {
952934 return Err ( Error :: NotRunning ) ;
953935 }
936+ let runtime = rt_lock. as_ref ( ) . unwrap ( ) ;
937+
954938 let wallet = Arc :: clone ( & self . wallet ) ;
955939 let tx_sync = Arc :: clone ( & self . tx_sync ) ;
956940 let sync_cman = Arc :: clone ( & self . channel_manager ) ;
@@ -961,7 +945,6 @@ impl Node {
961945 & * sync_cmon as & ( dyn Confirm + Sync + Send ) ,
962946 ] ;
963947
964- let runtime = runtime_lock. as_ref ( ) . unwrap ( ) ;
965948 tokio:: task:: block_in_place ( move || {
966949 tokio:: runtime:: Builder :: new_current_thread ( ) . enable_all ( ) . build ( ) . unwrap ( ) . block_on (
967950 async move {
@@ -986,7 +969,7 @@ impl Node {
986969
987970 let sync_logger = Arc :: clone ( & self . logger ) ;
988971 tokio:: task:: block_in_place ( move || {
989- runtime. tokio_runtime . block_on ( async move {
972+ runtime. block_on ( async move {
990973 let now = Instant :: now ( ) ;
991974 match tx_sync. sync ( confirmables) . await {
992975 Ok ( ( ) ) => {
@@ -1021,7 +1004,8 @@ impl Node {
10211004
10221005 /// Send a payement given an invoice.
10231006 pub fn send_payment ( & self , invoice : Invoice ) -> Result < PaymentHash , Error > {
1024- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1007+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1008+ if rt_lock. is_none ( ) {
10251009 return Err ( Error :: NotRunning ) ;
10261010 }
10271011
@@ -1087,7 +1071,8 @@ impl Node {
10871071 pub fn send_payment_using_amount (
10881072 & self , invoice : Invoice , amount_msat : u64 ,
10891073 ) -> Result < PaymentHash , Error > {
1090- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1074+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1075+ if rt_lock. is_none ( ) {
10911076 return Err ( Error :: NotRunning ) ;
10921077 }
10931078
@@ -1175,7 +1160,8 @@ impl Node {
11751160 pub fn send_spontaneous_payment (
11761161 & self , amount_msat : u64 , node_id : & str ,
11771162 ) -> Result < PaymentHash , Error > {
1178- if self . running . read ( ) . unwrap ( ) . is_none ( ) {
1163+ let rt_lock = self . runtime . read ( ) . unwrap ( ) ;
1164+ if rt_lock. is_none ( ) {
11791165 return Err ( Error :: NotRunning ) ;
11801166 }
11811167
0 commit comments