Skip to content

Commit a0ca100

Browse files
authored
Merge pull request #4118 from TheBlueMatt/2025-09-lsps-persistence-races
#4059 followups
2 parents 3aa9187 + a9ddf3f commit a0ca100

File tree

3 files changed

+163
-70
lines changed

3 files changed

+163
-70
lines changed

lightning-liquidity/src/lsps2/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ pub enum LSPS2ServiceEvent {
163163
///
164164
/// **Note: ** As this event is persisted and might get replayed after restart, you'll need to
165165
/// ensure channel creation idempotency. I.e., please check if you already created a
166-
/// corresponding channel based on the given `their_network_key` and `intercept_scid` and
166+
/// corresponding channel based on the given `their_network_key` and `user_channel_id` and
167167
/// ignore this event in case you did.
168168
///
169169
/// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel

lightning-liquidity/src/lsps2/service.rs

Lines changed: 84 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ where
593593
peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
594594
total_pending_requests: AtomicUsize,
595595
config: LSPS2ServiceConfig,
596+
persistence_in_flight: AtomicUsize,
596597
}
597598

598599
impl<CM: Deref, K: Deref + Clone> LSPS2ServiceHandler<CM, K>
@@ -640,6 +641,7 @@ where
640641
peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
641642
peer_by_channel_id: RwLock::new(peer_by_channel_id),
642643
total_pending_requests: AtomicUsize::new(0),
644+
persistence_in_flight: AtomicUsize::new(0),
643645
channel_manager,
644646
kv_store,
645647
config,
@@ -1603,7 +1605,7 @@ where
16031605
) -> Result<(), lightning::io::Error> {
16041606
let fut = {
16051607
let outer_state_lock = self.per_peer_state.read().unwrap();
1606-
let encoded = match outer_state_lock.get(&counterparty_node_id) {
1608+
match outer_state_lock.get(&counterparty_node_id) {
16071609
None => {
16081610
// We dropped the peer state by now.
16091611
return Ok(());
@@ -1615,18 +1617,19 @@ where
16151617
return Ok(());
16161618
} else {
16171619
peer_state_lock.needs_persist = false;
1618-
peer_state_lock.encode()
1620+
let key = counterparty_node_id.to_string();
1621+
let encoded = peer_state_lock.encode();
1622+
// Begin the write with the entry lock held. This avoids racing with
1623+
// potentially-in-flight `persist` calls writing state for the same peer.
1624+
self.kv_store.write(
1625+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1626+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1627+
&key,
1628+
encoded,
1629+
)
16191630
}
16201631
},
1621-
};
1622-
let key = counterparty_node_id.to_string();
1623-
1624-
self.kv_store.write(
1625-
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1626-
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1627-
&key,
1628-
encoded,
1629-
)
1632+
}
16301633
};
16311634

16321635
fut.await.map_err(|e| {
@@ -1644,38 +1647,80 @@ where
16441647
// introduce some batching to upper-bound the number of requests inflight at any given
16451648
// time.
16461649

1647-
let mut need_remove = Vec::new();
1648-
let mut need_persist = Vec::new();
1650+
if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
1651+
// If we're not the first event processor to get here, just return early, the increment
1652+
// we just did will be treated as "go around again" at the end.
1653+
return Ok(());
1654+
}
16491655

1650-
{
1651-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1652-
outer_state_lock.retain(|counterparty_node_id, inner_state_lock| {
1653-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1654-
peer_state_lock.prune_expired_request_state();
1655-
let is_prunable = peer_state_lock.is_prunable();
1656-
if is_prunable {
1657-
need_remove.push(*counterparty_node_id);
1658-
} else if peer_state_lock.needs_persist {
1659-
need_persist.push(*counterparty_node_id);
1656+
loop {
1657+
let mut need_remove = Vec::new();
1658+
let mut need_persist = Vec::new();
1659+
1660+
{
1661+
// First build a list of peers to persist and prune with the read lock. This allows
1662+
// us to avoid the write lock unless we actually need to remove a node.
1663+
let outer_state_lock = self.per_peer_state.read().unwrap();
1664+
for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1665+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1666+
peer_state_lock.prune_expired_request_state();
1667+
let is_prunable = peer_state_lock.is_prunable();
1668+
if is_prunable {
1669+
need_remove.push(*counterparty_node_id);
1670+
} else if peer_state_lock.needs_persist {
1671+
need_persist.push(*counterparty_node_id);
1672+
}
16601673
}
1661-
!is_prunable
1662-
});
1663-
}
1674+
}
16641675

1665-
for counterparty_node_id in need_persist.into_iter() {
1666-
debug_assert!(!need_remove.contains(&counterparty_node_id));
1667-
self.persist_peer_state(counterparty_node_id).await?;
1668-
}
1676+
for counterparty_node_id in need_persist.into_iter() {
1677+
debug_assert!(!need_remove.contains(&counterparty_node_id));
1678+
self.persist_peer_state(counterparty_node_id).await?;
1679+
}
16691680

1670-
for counterparty_node_id in need_remove {
1671-
let key = counterparty_node_id.to_string();
1672-
self.kv_store
1673-
.remove(
1674-
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1675-
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1676-
&key,
1677-
)
1678-
.await?;
1681+
for counterparty_node_id in need_remove {
1682+
let mut future_opt = None;
1683+
{
1684+
// We need to take the `per_peer_state` write lock to remove an entry, but also
1685+
// have to hold it until after the `remove` call returns (but not through
1686+
// future completion) to ensure that writes for the peer's state are
1687+
// well-ordered with other `persist_peer_state` calls even across the removal
1688+
// itself.
1689+
let mut per_peer_state = self.per_peer_state.write().unwrap();
1690+
if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
1691+
let state = entry.get_mut().get_mut().unwrap();
1692+
if state.is_prunable() {
1693+
entry.remove();
1694+
let key = counterparty_node_id.to_string();
1695+
future_opt = Some(self.kv_store.remove(
1696+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1697+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1698+
&key,
1699+
));
1700+
} else {
1701+
// If the peer got new state, force a re-persist of the current state.
1702+
state.needs_persist = true;
1703+
}
1704+
} else {
1705+
// This should never happen, we can only have one `persist` call
1706+
// in-progress at once and map entries are only removed by it.
1707+
debug_assert!(false);
1708+
}
1709+
}
1710+
if let Some(future) = future_opt {
1711+
future.await?;
1712+
} else {
1713+
self.persist_peer_state(counterparty_node_id).await?;
1714+
}
1715+
}
1716+
1717+
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
1718+
// If another thread incremented the state while we were running we should go
1719+
// around again, but only once.
1720+
self.persistence_in_flight.store(1, Ordering::Release);
1721+
continue;
1722+
}
1723+
break;
16791724
}
16801725

16811726
Ok(())

lightning-liquidity/src/lsps5/service.rs

Lines changed: 78 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::message_queue::MessageQueue;
2020
use crate::persist::{
2121
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
2222
};
23+
use crate::prelude::hash_map::Entry;
2324
use crate::prelude::*;
2425
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
2526
use crate::utils::time::TimeProvider;
@@ -35,6 +36,7 @@ use lightning::util::persist::KVStore;
3536
use lightning::util::ser::Writeable;
3637

3738
use core::ops::Deref;
39+
use core::sync::atomic::{AtomicUsize, Ordering};
3840
use core::time::Duration;
3941

4042
use alloc::string::String;
@@ -139,6 +141,7 @@ where
139141
node_signer: NS,
140142
kv_store: K,
141143
last_pruning: Mutex<Option<LSPSDateTime>>,
144+
persistence_in_flight: AtomicUsize,
142145
}
143146

144147
impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPS5ServiceHandler<CM, NS, K, TP>
@@ -166,6 +169,7 @@ where
166169
node_signer,
167170
kv_store,
168171
last_pruning: Mutex::new(None),
172+
persistence_in_flight: AtomicUsize::new(0),
169173
}
170174
}
171175

@@ -220,6 +224,8 @@ where
220224

221225
let key = counterparty_node_id.to_string();
222226

227+
// Begin the write with the `per_peer_state` write lock held to avoid racing with
228+
// potentially-in-flight `persist` calls writing state for the same peer.
223229
self.kv_store.write(
224230
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
225231
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -242,38 +248,80 @@ where
242248
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
243249
// introduce some batching to upper-bound the number of requests inflight at any given
244250
// time.
245-
let mut need_remove = Vec::new();
246-
let mut need_persist = Vec::new();
247-
{
248-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
249-
self.check_prune_stale_webhooks(&mut outer_state_lock);
250-
251-
outer_state_lock.retain(|client_id, peer_state| {
252-
let is_prunable = peer_state.is_prunable();
253-
let has_open_channel = self.client_has_open_channel(client_id);
254-
if is_prunable && !has_open_channel {
255-
need_remove.push(*client_id);
256-
} else if peer_state.needs_persist {
257-
need_persist.push(*client_id);
258-
}
259-
!is_prunable || has_open_channel
260-
});
261-
};
262251

263-
for counterparty_node_id in need_persist.into_iter() {
264-
debug_assert!(!need_remove.contains(&counterparty_node_id));
265-
self.persist_peer_state(counterparty_node_id).await?;
252+
if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
253+
// If we're not the first event processor to get here, just return early, the increment
254+
// we just did will be treated as "go around again" at the end.
255+
return Ok(());
266256
}
267257

268-
for counterparty_node_id in need_remove {
269-
let key = counterparty_node_id.to_string();
270-
self.kv_store
271-
.remove(
272-
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
273-
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
274-
&key,
275-
)
276-
.await?;
258+
loop {
259+
let mut need_remove = Vec::new();
260+
let mut need_persist = Vec::new();
261+
262+
self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
263+
{
264+
let outer_state_lock = self.per_peer_state.read().unwrap();
265+
266+
for (client_id, peer_state) in outer_state_lock.iter() {
267+
let is_prunable = peer_state.is_prunable();
268+
let has_open_channel = self.client_has_open_channel(client_id);
269+
if is_prunable && !has_open_channel {
270+
need_remove.push(*client_id);
271+
} else if peer_state.needs_persist {
272+
need_persist.push(*client_id);
273+
}
274+
}
275+
}
276+
277+
for client_id in need_persist.into_iter() {
278+
debug_assert!(!need_remove.contains(&client_id));
279+
self.persist_peer_state(client_id).await?;
280+
}
281+
282+
for client_id in need_remove {
283+
let mut future_opt = None;
284+
{
285+
// We need to take the `per_peer_state` write lock to remove an entry, but also
286+
// have to hold it until after the `remove` call returns (but not through
287+
// future completion) to ensure that writes for the peer's state are
288+
// well-ordered with other `persist_peer_state` calls even across the removal
289+
// itself.
290+
let mut per_peer_state = self.per_peer_state.write().unwrap();
291+
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
292+
let state = entry.get_mut();
293+
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
294+
entry.remove();
295+
let key = client_id.to_string();
296+
future_opt = Some(self.kv_store.remove(
297+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
298+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
299+
&key,
300+
));
301+
} else {
302+
// If the peer was re-added, force a re-persist of the current state.
303+
state.needs_persist = true;
304+
}
305+
} else {
306+
// This should never happen, we can only have one `persist` call
307+
// in-progress at once and map entries are only removed by it.
308+
debug_assert!(false);
309+
}
310+
}
311+
if let Some(future) = future_opt {
312+
future.await?;
313+
} else {
314+
self.persist_peer_state(client_id).await?;
315+
}
316+
}
317+
318+
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
319+
// If another thread incremented the state while we were running we should go
320+
// around again, but only once.
321+
self.persistence_in_flight.store(1, Ordering::Release);
322+
continue;
323+
}
324+
break;
277325
}
278326

279327
Ok(())
@@ -761,7 +809,7 @@ impl PeerState {
761809
});
762810
}
763811

764-
fn is_prunable(&mut self) -> bool {
812+
fn is_prunable(&self) -> bool {
765813
self.webhooks.is_empty()
766814
}
767815
}

0 commit comments

Comments
 (0)