@@ -34,7 +34,7 @@ mod logger;
3434use access:: LdkLiteChainAccess ;
3535pub use error:: LdkLiteError as Error ;
3636pub use event:: LdkLiteEvent ;
37- use event:: LdkLiteEventHandler ;
37+ use event:: { LdkLiteEventHandler , LdkLiteEventQueue } ;
3838
3939#[ allow( unused_imports) ]
4040use logger:: {
@@ -81,16 +81,12 @@ use std::collections::HashMap;
8181use std:: fs;
8282use std:: net:: SocketAddr ;
8383use std:: sync:: atomic:: { AtomicBool , Ordering } ;
84- use std:: sync:: mpsc;
8584use std:: sync:: { Arc , Mutex , RwLock } ;
8685use std:: thread;
8786use std:: time:: { Duration , SystemTime } ;
8887
8988// TODO: Is MemoryDatabase okay to use?
9089
91- // The number of messages we buffer in the used channels.
92- const CHANNEL_BUF_SIZE : usize = 1000 ;
93-
9490// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
9591// number of blocks after which BDK stops looking for scripts belonging to the wallet.
9692const BDK_CLIENT_STOP_GAP : usize = 20 ;
@@ -328,17 +324,23 @@ impl LdkLiteBuilder {
328324 let inbound_payments = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
329325 let outbound_payments = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
330326
331- // Step 14: Handle LDK Events
332- let event_queue = mpsc:: sync_channel ( CHANNEL_BUF_SIZE ) ;
333- let event_sender = event_queue. 0 . clone ( ) ;
327+ // Step 14: Restore event handler from disk or create a new one.
328+ let event_queue = if let Ok ( mut f) =
329+ fs:: File :: open ( format ! ( "{}/{}" , ldk_data_dir. clone( ) , event:: EVENTS_PERSISTENCE_KEY ) )
330+ {
331+ Arc :: new ( LdkLiteEventQueue :: read ( & mut f, Arc :: clone ( & persister) ) ?)
332+ } else {
333+ Arc :: new ( LdkLiteEventQueue :: new ( Arc :: clone ( & persister) ) )
334+ } ;
335+
334336 let event_handler = LdkLiteEventHandler :: new (
335337 Arc :: clone ( & chain_access) ,
338+ Arc :: clone ( & event_queue) ,
336339 Arc :: clone ( & channel_manager) ,
337340 Arc :: clone ( & network_graph) ,
338341 Arc :: clone ( & keys_manager) ,
339342 Arc :: clone ( & inbound_payments) ,
340343 Arc :: clone ( & outbound_payments) ,
341- event_sender,
342344 Arc :: clone ( & logger) ,
343345 Arc :: clone ( & config) ,
344346 ) ;
@@ -365,6 +367,7 @@ impl LdkLiteBuilder {
365367 running,
366368 config,
367369 chain_access,
370+ event_queue,
368371 channel_manager,
369372 chain_monitor,
370373 peer_manager,
@@ -377,7 +380,6 @@ impl LdkLiteBuilder {
377380 invoice_payer,
378381 inbound_payments,
379382 outbound_payments,
380- event_queue,
381383 } )
382384 }
383385}
@@ -398,6 +400,7 @@ pub struct LdkLite {
398400 running : RwLock < Option < LdkLiteRuntime > > ,
399401 config : Arc < LdkLiteConfig > ,
400402 chain_access : Arc < LdkLiteChainAccess < MemoryDatabase > > ,
403+ event_queue : Arc < LdkLiteEventQueue < FilesystemPersister > > ,
401404 channel_manager : Arc < ChannelManager > ,
402405 chain_monitor : Arc < ChainMonitor > ,
403406 peer_manager : Arc < PeerManager > ,
@@ -410,7 +413,6 @@ pub struct LdkLite {
410413 invoice_payer : Arc < InvoicePayer < LdkLiteEventHandler > > ,
411414 inbound_payments : Arc < PaymentInfoStorage > ,
412415 outbound_payments : Arc < PaymentInfoStorage > ,
413- event_queue : ( EventSender , EventReceiver ) ,
414416}
415417
416418impl LdkLite {
@@ -579,9 +581,16 @@ impl LdkLite {
579581 } )
580582 }
581583
582- /// Returns the next event from the event queue. Blocks until a new event is available.
583- pub fn next_event ( & self ) -> Result < LdkLiteEvent , Error > {
584- Ok ( self . event_queue . 1 . recv ( ) ?)
584+ /// Blocks until the next event is available.
585+ ///
586+ /// Note: this will always return the same event until handling is confirmed via [`event_handled`].
587+ pub fn next_event ( & self ) -> LdkLiteEvent {
588+ self . event_queue . next_event ( )
589+ }
590+
591+ /// Confirm the last retrieved event handled.
592+ pub fn event_handled ( & self ) -> Result < ( ) , Error > {
593+ self . event_queue . event_handled ( )
585594 }
586595
587596 /// Returns our own node id
@@ -955,8 +964,5 @@ pub(crate) type NetworkGraph = gossip::NetworkGraph<Arc<FilesystemLogger>>;
955964
956965pub ( crate ) type PaymentInfoStorage = Mutex < HashMap < PaymentHash , PaymentInfo > > ;
957966
958- pub ( crate ) type EventSender = mpsc:: SyncSender < LdkLiteEvent > ;
959- pub ( crate ) type EventReceiver = mpsc:: Receiver < LdkLiteEvent > ;
960-
961967#[ cfg( test) ]
962968mod tests { }
0 commit comments