@@ -36,6 +36,7 @@ use lightning::util::persist::KVStore;
36
36
use lightning:: util:: ser:: Writeable ;
37
37
38
38
use core:: ops:: Deref ;
39
+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
39
40
use core:: time:: Duration ;
40
41
41
42
use alloc:: string:: String ;
@@ -140,6 +141,7 @@ where
140
141
node_signer : NS ,
141
142
kv_store : K ,
142
143
last_pruning : Mutex < Option < LSPSDateTime > > ,
144
+ persistence_in_flight : AtomicUsize ,
143
145
}
144
146
145
147
impl < CM : Deref , NS : Deref , K : Deref + Clone , TP : Deref > LSPS5ServiceHandler < CM , NS , K , TP >
@@ -167,6 +169,7 @@ where
167
169
node_signer,
168
170
kv_store,
169
171
last_pruning : Mutex :: new ( None ) ,
172
+ persistence_in_flight : AtomicUsize :: new ( 0 ) ,
170
173
}
171
174
}
172
175
@@ -245,63 +248,80 @@ where
245
248
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
246
249
// introduce some batching to upper-bound the number of requests inflight at any given
247
250
// time.
248
- let mut need_remove = Vec :: new ( ) ;
249
- let mut need_persist = Vec :: new ( ) ;
250
-
251
- self . check_prune_stale_webhooks ( & mut self . per_peer_state . write ( ) . unwrap ( ) ) ;
252
- {
253
- let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
254
-
255
- for ( client_id, peer_state) in outer_state_lock. iter ( ) {
256
- let is_prunable = peer_state. is_prunable ( ) ;
257
- let has_open_channel = self . client_has_open_channel ( client_id) ;
258
- if is_prunable && !has_open_channel {
259
- need_remove. push ( * client_id) ;
260
- } else if peer_state. needs_persist {
261
- need_persist. push ( * client_id) ;
262
- }
263
- }
264
- }
265
251
266
- for client_id in need_persist. into_iter ( ) {
267
- debug_assert ! ( !need_remove. contains( & client_id) ) ;
268
- self . persist_peer_state ( client_id) . await ?;
252
+ if self . persistence_in_flight . fetch_add ( 1 , Ordering :: AcqRel ) > 0 {
253
+ // If we're not the first event processor to get here, just return early, the increment
254
+ // we just did will be treated as "go around again" at the end.
255
+ return Ok ( ( ) ) ;
269
256
}
270
257
271
- for client_id in need_remove {
272
- let mut future_opt = None ;
258
+ loop {
259
+ let mut need_remove = Vec :: new ( ) ;
260
+ let mut need_persist = Vec :: new ( ) ;
261
+
262
+ self . check_prune_stale_webhooks ( & mut self . per_peer_state . write ( ) . unwrap ( ) ) ;
273
263
{
274
- // We need to take the `per_peer_state` write lock to remove an entry, but also
275
- // have to hold it until after the `remove` call returns (but not through
276
- // future completion) to ensure that writes for the peer's state are
277
- // well-ordered with other `persist_peer_state` calls even across the removal
278
- // itself.
279
- let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
280
- if let Entry :: Occupied ( mut entry) = per_peer_state. entry ( client_id) {
281
- let state = entry. get_mut ( ) ;
282
- if state. is_prunable ( ) && !self . client_has_open_channel ( & client_id) {
283
- entry. remove ( ) ;
284
- let key = client_id. to_string ( ) ;
285
- future_opt = Some ( self . kv_store . remove (
286
- LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
287
- LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
288
- & key,
289
- ) ) ;
264
+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
265
+
266
+ for ( client_id, peer_state) in outer_state_lock. iter ( ) {
267
+ let is_prunable = peer_state. is_prunable ( ) ;
268
+ let has_open_channel = self . client_has_open_channel ( client_id) ;
269
+ if is_prunable && !has_open_channel {
270
+ need_remove. push ( * client_id) ;
271
+ } else if peer_state. needs_persist {
272
+ need_persist. push ( * client_id) ;
273
+ }
274
+ }
275
+ }
276
+
277
+ for client_id in need_persist. into_iter ( ) {
278
+ debug_assert ! ( !need_remove. contains( & client_id) ) ;
279
+ self . persist_peer_state ( client_id) . await ?;
280
+ }
281
+
282
+ for client_id in need_remove {
283
+ let mut future_opt = None ;
284
+ {
285
+ // We need to take the `per_peer_state` write lock to remove an entry, but also
286
+ // have to hold it until after the `remove` call returns (but not through
287
+ // future completion) to ensure that writes for the peer's state are
288
+ // well-ordered with other `persist_peer_state` calls even across the removal
289
+ // itself.
290
+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
291
+ if let Entry :: Occupied ( mut entry) = per_peer_state. entry ( client_id) {
292
+ let state = entry. get_mut ( ) ;
293
+ if state. is_prunable ( ) && !self . client_has_open_channel ( & client_id) {
294
+ entry. remove ( ) ;
295
+ let key = client_id. to_string ( ) ;
296
+ future_opt = Some ( self . kv_store . remove (
297
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
298
+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
299
+ & key,
300
+ ) ) ;
301
+ } else {
302
+ // If the peer was re-added, force a re-persist of the current state.
303
+ state. needs_persist = true ;
304
+ }
290
305
} else {
291
- // If the peer was re-added, force a re-persist of the current state.
292
- state. needs_persist = true ;
306
+ // This should never happen, we can only have one `persist` call
307
+ // in-progress at once and map entries are only removed by it.
308
+ debug_assert ! ( false ) ;
293
309
}
310
+ }
311
+ if let Some ( future) = future_opt {
312
+ future. await ?;
294
313
} else {
295
- // This should never happen, we can only have one `persist` call
296
- // in-progress at once and map entries are only removed by it.
297
- debug_assert ! ( false ) ;
314
+ self . persist_peer_state ( client_id) . await ?;
298
315
}
299
316
}
300
- if let Some ( future) = future_opt {
301
- future. await ?;
302
- } else {
303
- self . persist_peer_state ( client_id) . await ?;
317
+
318
+ if self . persistence_in_flight . fetch_sub ( 1 , Ordering :: AcqRel ) != 1 {
319
+ // If another thread incremented the state while we were running we should go
320
+ // around again, but only once.
321
+ self . persistence_in_flight . store ( 1 , Ordering :: Release ) ;
322
+ continue ;
304
323
}
324
+ break ;
305
325
}
306
326
307
327
Ok ( ( ) )
0 commit comments