11use crate :: {
2- ChannelManager , EventSender , LdkLiteChainAccess , LdkLiteConfig , NetworkGraph ,
2+ ChannelManager , Error , FilesystemPersister , LdkLiteChainAccess , LdkLiteConfig , NetworkGraph ,
33 PaymentInfoStorage ,
44} ;
55
@@ -12,48 +12,153 @@ use crate::logger::{
1212use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
1313use lightning:: chain:: keysinterface:: KeysManager ;
1414use lightning:: util:: events as ldk_events;
15+ use lightning:: util:: persist:: KVStorePersister ;
16+ use lightning:: util:: ser:: { Readable , ReadableArgs , Writeable , Writer } ;
1517
1618use bdk:: database:: MemoryDatabase ;
1719
1820use bitcoin:: secp256k1:: Secp256k1 ;
19- //use std::collections::{hash_map, VecDeque};
20- //use std::iter::Iterator;
21- use std:: sync:: Arc ;
21+ use std:: collections:: VecDeque ;
22+ use std:: sync:: { Arc , Condvar , Mutex } ;
23+
24+ /// The event queue will be persisted under this key.
25+ pub ( crate ) const EVENTS_PERSISTENCE_KEY : & str = "events" ;
2226
2327/// An LdkLiteEvent that should be handled by the user.
28+ #[ derive( Debug , Clone ) ]
2429pub enum LdkLiteEvent {
2530 /// asdf
2631 Test ,
2732}
2833
34+ impl Readable for LdkLiteEvent {
35+ fn read < R : lightning:: io:: Read > (
36+ reader : & mut R ,
37+ ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
38+ match Readable :: read ( reader) ? {
39+ // TODO
40+ 0u8 => Ok ( LdkLiteEvent :: Test ) ,
41+ _ => Ok ( LdkLiteEvent :: Test ) ,
42+ }
43+ }
44+ }
45+
46+ impl Writeable for LdkLiteEvent {
47+ fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
48+ match self {
49+ Test => {
50+ // TODO
51+ 0u8 . write ( writer) ?;
52+ Ok ( ( ) )
53+ }
54+ }
55+ }
56+ }
57+
58+ pub ( crate ) struct LdkLiteEventQueue < K : KVStorePersister > {
59+ queue : Mutex < EventQueueSerWrapper > ,
60+ notifier : Condvar ,
61+ persister : Arc < K > ,
62+ }
63+
64+ impl < K : KVStorePersister > LdkLiteEventQueue < K > {
65+ pub ( crate ) fn new ( persister : Arc < K > ) -> Self {
66+ let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( EventQueueSerWrapper ( VecDeque :: new ( ) ) ) ;
67+ let notifier = Condvar :: new ( ) ;
68+ Self { queue, notifier, persister }
69+ }
70+ pub ( crate ) fn add_event ( & self , event : LdkLiteEvent ) -> Result < ( ) , Error > {
71+ let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
72+ locked_queue. 0 . push_back ( event) ;
73+
74+ self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
75+
76+ self . notifier . notify_one ( ) ;
77+ Ok ( ( ) )
78+ }
79+
80+ pub ( crate ) fn next_event ( & self ) -> LdkLiteEvent {
81+ let locked_queue = self
82+ . notifier
83+ . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue| queue. 0 . is_empty ( ) )
84+ . unwrap ( ) ;
85+ locked_queue. 0 . front ( ) . unwrap ( ) . clone ( )
86+ }
87+
88+ pub ( crate ) fn event_handled ( & self ) -> Result < ( ) , Error > {
89+ let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
90+ locked_queue. 0 . pop_front ( ) ;
91+ self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
92+ self . notifier . notify_one ( ) ;
93+ Ok ( ( ) )
94+ }
95+ }
96+
97+ impl < K : KVStorePersister > ReadableArgs < Arc < K > > for LdkLiteEventQueue < K > {
98+ #[ inline]
99+ fn read < R : lightning:: io:: Read > (
100+ reader : & mut R , persister : Arc < K > ,
101+ ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
102+ let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( Readable :: read ( reader) ?) ;
103+ let notifier = Condvar :: new ( ) ;
104+ Ok ( Self { queue, notifier, persister } )
105+ }
106+ }
107+
108+ struct EventQueueSerWrapper ( VecDeque < LdkLiteEvent > ) ;
109+
110+ impl Readable for EventQueueSerWrapper {
111+ fn read < R : lightning:: io:: Read > (
112+ reader : & mut R ,
113+ ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
114+ let len: u16 = Readable :: read ( reader) ?;
115+ let mut queue = VecDeque :: with_capacity ( len as usize ) ;
116+ for _ in 0 ..len {
117+ queue. push_back ( Readable :: read ( reader) ?) ;
118+ }
119+ Ok ( EventQueueSerWrapper ( queue) )
120+ }
121+ }
122+
123+ impl Writeable for EventQueueSerWrapper {
124+ fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
125+ ( self . 0 . len ( ) as u16 ) . write ( writer) ?;
126+ for e in self . 0 . iter ( ) {
127+ e. write ( writer) ?;
128+ }
129+ Ok ( ( ) )
130+ }
131+ }
132+
29133pub ( crate ) struct LdkLiteEventHandler {
30134 chain_access : Arc < LdkLiteChainAccess < MemoryDatabase > > ,
135+ event_queue : Arc < LdkLiteEventQueue < FilesystemPersister > > ,
31136 channel_manager : Arc < ChannelManager > ,
32137 _network_graph : Arc < NetworkGraph > ,
33138 keys_manager : Arc < KeysManager > ,
34139 _inbound_payments : Arc < PaymentInfoStorage > ,
35140 _outbound_payments : Arc < PaymentInfoStorage > ,
36- _event_sender : EventSender ,
37141 logger : Arc < FilesystemLogger > ,
38142 _config : Arc < LdkLiteConfig > ,
39143}
40144
41145impl LdkLiteEventHandler {
42146 pub fn new (
43147 chain_access : Arc < LdkLiteChainAccess < MemoryDatabase > > ,
148+ event_queue : Arc < LdkLiteEventQueue < FilesystemPersister > > ,
44149 channel_manager : Arc < ChannelManager > , _network_graph : Arc < NetworkGraph > ,
45150 keys_manager : Arc < KeysManager > , _inbound_payments : Arc < PaymentInfoStorage > ,
46- _outbound_payments : Arc < PaymentInfoStorage > , _event_sender : EventSender ,
47- logger : Arc < FilesystemLogger > , _config : Arc < LdkLiteConfig > ,
151+ _outbound_payments : Arc < PaymentInfoStorage > , logger : Arc < FilesystemLogger > ,
152+ _config : Arc < LdkLiteConfig > ,
48153 ) -> Self {
49154 Self {
155+ event_queue,
50156 chain_access,
51157 channel_manager,
52158 _network_graph,
53159 keys_manager,
54160 _inbound_payments,
55161 _outbound_payments,
56- _event_sender,
57162 logger,
58163 _config,
59164 }
0 commit comments