@@ -20,6 +20,7 @@ use crate::message_queue::MessageQueue;
20
20
use crate :: persist:: {
21
21
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
22
22
} ;
23
+ use crate :: prelude:: hash_map:: Entry ;
23
24
use crate :: prelude:: * ;
24
25
use crate :: sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ;
25
26
use crate :: utils:: time:: TimeProvider ;
@@ -220,6 +221,8 @@ where
220
221
221
222
let key = counterparty_node_id. to_string ( ) ;
222
223
224
+ // Begin the write with the `per_peer_state` write lock held to avoid racing with
225
+ // potentially-in-flight `persist` calls writing state for the same peer.
223
226
self . kv_store . write (
224
227
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
225
228
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
@@ -244,36 +247,61 @@ where
244
247
// time.
245
248
let mut need_remove = Vec :: new ( ) ;
246
249
let mut need_persist = Vec :: new ( ) ;
250
+
251
+ self . check_prune_stale_webhooks ( & mut self . per_peer_state . write ( ) . unwrap ( ) ) ;
247
252
{
248
- let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
249
- self . check_prune_stale_webhooks ( & mut outer_state_lock) ;
253
+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
250
254
251
- outer_state_lock . retain ( | client_id, peer_state| {
255
+ for ( client_id, peer_state) in outer_state_lock . iter ( ) {
252
256
let is_prunable = peer_state. is_prunable ( ) ;
253
257
let has_open_channel = self . client_has_open_channel ( client_id) ;
254
258
if is_prunable && !has_open_channel {
255
259
need_remove. push ( * client_id) ;
256
260
} else if peer_state. needs_persist {
257
261
need_persist. push ( * client_id) ;
258
262
}
259
- !is_prunable || has_open_channel
260
- } ) ;
261
- } ;
263
+ }
264
+ }
262
265
263
- for counterparty_node_id in need_persist. into_iter ( ) {
264
- debug_assert ! ( !need_remove. contains( & counterparty_node_id ) ) ;
265
- self . persist_peer_state ( counterparty_node_id ) . await ?;
266
+ for client_id in need_persist. into_iter ( ) {
267
+ debug_assert ! ( !need_remove. contains( & client_id ) ) ;
268
+ self . persist_peer_state ( client_id ) . await ?;
266
269
}
267
270
268
- for counterparty_node_id in need_remove {
269
- let key = counterparty_node_id. to_string ( ) ;
270
- self . kv_store
271
- . remove (
272
- LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
273
- LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
274
- & key,
275
- )
276
- . await ?;
271
+ for client_id in need_remove {
272
+ let mut future_opt = None ;
273
+ {
274
+ // We need to take the `per_peer_state` write lock to remove an entry, but also
275
+ // have to hold it until after the `remove` call returns (but not through
276
+ // future completion) to ensure that writes for the peer's state are
277
+ // well-ordered with other `persist_peer_state` calls even across the removal
278
+ // itself.
279
+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
280
+ if let Entry :: Occupied ( mut entry) = per_peer_state. entry ( client_id) {
281
+ let state = entry. get_mut ( ) ;
282
+ if state. is_prunable ( ) && !self . client_has_open_channel ( & client_id) {
283
+ entry. remove ( ) ;
284
+ let key = client_id. to_string ( ) ;
285
+ future_opt = Some ( self . kv_store . remove (
286
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
287
+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
288
+ & key,
289
+ ) ) ;
290
+ } else {
291
+ // If the peer was re-added, force a re-persist of the current state.
292
+ state. needs_persist = true ;
293
+ }
294
+ } else {
295
+ // This should never happen, we can only have one `persist` call
296
+ // in-progress at once and map entries are only removed by it.
297
+ debug_assert ! ( false ) ;
298
+ }
299
+ }
300
+ if let Some ( future) = future_opt {
301
+ future. await ?;
302
+ } else {
303
+ self . persist_peer_state ( client_id) . await ?;
304
+ }
277
305
}
278
306
279
307
Ok ( ( ) )
@@ -761,7 +789,7 @@ impl PeerState {
761
789
} ) ;
762
790
}
763
791
764
- fn is_prunable ( & mut self ) -> bool {
792
+ fn is_prunable ( & self ) -> bool {
765
793
self . webhooks . is_empty ( )
766
794
}
767
795
}
0 commit comments