Skip to content

Commit b419925

Browse files
committed
Remove pruned LSPS2/LSPS5 peer state entries from the KVStore
Previously, we'd persist peer states to the `KVStore`, but, while we pruned them eventually from our in-memory state, we wouldn't remove it from the `KVStore`. Here, we change this and regularly prune and delete peer state entries from the `KVStore`. Note we still prune the state-internal data on peer disconnection, but leave removal to our (BP-driven) async `persist` calls.
1 parent 62f1fad commit b419925

File tree

3 files changed

+79
-47
lines changed

3 files changed

+79
-47
lines changed

lightning-liquidity/src/lsps2/service.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ impl PeerState {
514514
// We abort the flow, and prune any data kept.
515515
self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
516516
self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
517-
// TODO: Remove peer state entry from the KVStore
517+
self.needs_persist |= true;
518518
return false;
519519
}
520520
true
@@ -1645,44 +1645,53 @@ where
16451645
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
16461646
// introduce some batching to upper-bound the number of requests inflight at any given
16471647
// time.
1648-
let need_persist: Vec<PublicKey> = {
1649-
let outer_state_lock = self.per_peer_state.read().unwrap();
1650-
outer_state_lock
1651-
.iter()
1652-
.filter_map(|(k, v)| if v.lock().unwrap().needs_persist { Some(*k) } else { None })
1653-
.collect()
1654-
};
1648+
1649+
let mut need_remove = Vec::new();
1650+
let mut need_persist = Vec::new();
1651+
1652+
{
1653+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1654+
outer_state_lock.retain(|counterparty_node_id, inner_state_lock| {
1655+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1656+
peer_state_lock.prune_expired_request_state();
1657+
let is_prunable = peer_state_lock.is_prunable();
1658+
if is_prunable {
1659+
need_remove.push(*counterparty_node_id);
1660+
} else if peer_state_lock.needs_persist {
1661+
need_persist.push(*counterparty_node_id);
1662+
}
1663+
!is_prunable
1664+
});
1665+
}
16551666

16561667
for counterparty_node_id in need_persist.into_iter() {
1668+
debug_assert!(!need_remove.contains(&counterparty_node_id));
16571669
self.persist_peer_state(counterparty_node_id).await?;
16581670
}
16591671

1672+
for counterparty_node_id in need_remove {
1673+
let key = counterparty_node_id.to_string();
1674+
self.kv_store
1675+
.remove(
1676+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1677+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1678+
&key,
1679+
true,
1680+
)
1681+
.await?;
1682+
}
1683+
16601684
Ok(())
16611685
}
16621686

16631687
pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1664-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1665-
let is_prunable =
1666-
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1667-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1668-
peer_state_lock.prune_expired_request_state();
1669-
peer_state_lock.is_prunable()
1670-
} else {
1671-
return;
1672-
};
1673-
if is_prunable {
1674-
outer_state_lock.remove(&counterparty_node_id);
1675-
}
1676-
}
1677-
1678-
#[allow(clippy::bool_comparison)]
1679-
pub(crate) fn prune_peer_state(&self) {
1680-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1681-
outer_state_lock.retain(|_, inner_state_lock| {
1688+
let outer_state_lock = self.per_peer_state.write().unwrap();
1689+
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
16821690
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1691+
// We clean up the peer state, but leave removing the peer entry to the prune logic in
1692+
// `persist` which removes it from the store.
16831693
peer_state_lock.prune_expired_request_state();
1684-
peer_state_lock.is_prunable() == false
1685-
});
1694+
}
16861695
}
16871696
}
16881697

lightning-liquidity/src/lsps5/service.rs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -242,18 +242,41 @@ where
242242
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
243243
// introduce some batching to upper-bound the number of requests inflight at any given
244244
// time.
245-
let need_persist: Vec<PublicKey> = {
246-
let outer_state_lock = self.per_peer_state.read().unwrap();
247-
outer_state_lock
248-
.iter()
249-
.filter_map(|(k, v)| if v.needs_persist { Some(*k) } else { None })
250-
.collect()
245+
let mut need_remove = Vec::new();
246+
let mut need_persist = Vec::new();
247+
{
248+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
249+
self.check_prune_stale_webhooks(&mut outer_state_lock);
250+
251+
outer_state_lock.retain(|client_id, peer_state| {
252+
let is_prunable = peer_state.is_prunable();
253+
let has_open_channel = self.client_has_open_channel(client_id);
254+
if is_prunable && !has_open_channel {
255+
need_remove.push(*client_id);
256+
} else if peer_state.needs_persist {
257+
need_persist.push(*client_id);
258+
}
259+
!is_prunable || has_open_channel
260+
});
251261
};
252262

253263
for counterparty_node_id in need_persist.into_iter() {
264+
debug_assert!(!need_remove.contains(&counterparty_node_id));
254265
self.persist_peer_state(counterparty_node_id).await?;
255266
}
256267

268+
for counterparty_node_id in need_remove {
269+
let key = counterparty_node_id.to_string();
270+
self.kv_store
271+
.remove(
272+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
273+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
274+
&key,
275+
true,
276+
)
277+
.await?;
278+
}
279+
257280
Ok(())
258281
}
259282

@@ -269,14 +292,11 @@ where
269292
});
270293

271294
if should_prune {
272-
outer_state_lock.retain(|client_id, peer_state| {
273-
if self.client_has_open_channel(client_id) {
274-
// Don't prune clients with open channels
275-
return true;
276-
}
277-
// TODO: Remove peer state entry from the KVStore
278-
!peer_state.prune_stale_webhooks(now)
279-
});
295+
for (_, peer_state) in outer_state_lock.iter_mut() {
296+
// Prune stale webhooks, but leave removal of the peers states to the prune logic
297+
// in `persist` which will remove it from the store.
298+
peer_state.prune_stale_webhooks(now)
299+
}
280300
*last_pruning = Some(now);
281301
}
282302
}
@@ -732,11 +752,17 @@ impl PeerState {
732752
}
733753

734754
// Returns whether the entire state is empty and can be pruned.
735-
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool {
755+
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) {
736756
self.webhooks.retain(|(_, webhook)| {
737-
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
757+
let should_prune = now.duration_since(&webhook.last_used) >= MIN_WEBHOOK_RETENTION_DAYS;
758+
if should_prune {
759+
self.needs_persist |= true;
760+
}
761+
!should_prune
738762
});
763+
}
739764

765+
fn is_prunable(&mut self) -> bool {
740766
self.webhooks.is_empty()
741767
}
742768
}

lightning-liquidity/src/manager.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,9 +1015,6 @@ where
10151015
*self.best_block.write().unwrap() = Some(new_best_block);
10161016

10171017
// TODO: Call best_block_updated on all sub-modules that require it, e.g., LSPS1MessageHandler.
1018-
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
1019-
lsps2_service_handler.prune_peer_state();
1020-
}
10211018
}
10221019

10231020
fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {

0 commit comments

Comments
 (0)