Skip to content

Commit 4c9c211

Browse files
authored
Merge pull request #4059 from tnull/2025-01-liquidity-persistence
`lightning-liquidity`: Add serialization logic, persist service state
2 parents 3e21ba3 + adc8a87 commit 4c9c211

34 files changed

+2593
-324
lines changed

fuzz/src/lsps_message.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use lightning::util::test_utils::{
2121
};
2222

2323
use lightning_liquidity::lsps0::ser::LSPS_MESSAGE_TYPE_ID;
24-
use lightning_liquidity::LiquidityManager;
24+
use lightning_liquidity::LiquidityManagerSync;
2525

2626
use core::time::Duration;
2727

@@ -77,15 +77,16 @@ pub fn do_test(data: &[u8]) {
7777
genesis_block.header.time,
7878
));
7979

80-
let liquidity_manager = Arc::new(LiquidityManager::new(
80+
let liquidity_manager = Arc::new(LiquidityManagerSync::new(
8181
Arc::clone(&keys_manager),
8282
Arc::clone(&keys_manager),
8383
Arc::clone(&manager),
8484
None::<Arc<dyn Filter + Send + Sync>>,
8585
None,
86+
kv_store,
8687
None,
8788
None,
88-
));
89+
).unwrap());
8990
let mut reader = data;
9091
if let Ok(Some(msg)) = liquidity_manager.read(LSPS_MESSAGE_TYPE_ID, &mut reader) {
9192
let secp = Secp256k1::signing_only();

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ possiblyrandom = { version = "0.2", path = "../possiblyrandom", default-features
3131
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
3232
lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] }
3333
lightning-invoice = { version = "0.34.0", path = "../lightning-invoice" }
34+
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false, features = ["_test_utils"] }
3435
lightning-persister = { version = "0.2.0", path = "../lightning-persister" }
3536

3637
[lints]

lightning-background-processor/src/lib.rs

Lines changed: 110 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ use lightning::util::wakers::Sleeper;
6969
use lightning_rapid_gossip_sync::RapidGossipSync;
7070

7171
use lightning_liquidity::ALiquidityManager;
72+
#[cfg(feature = "std")]
73+
use lightning_liquidity::ALiquidityManagerSync;
7274

7375
use core::ops::Deref;
7476
use core::time::Duration;
@@ -424,6 +426,31 @@ pub const NO_LIQUIDITY_MANAGER: Option<
424426
CM = &DynChannelManager,
425427
Filter = dyn chain::Filter,
426428
C = &dyn chain::Filter,
429+
KVStore = dyn lightning::util::persist::KVStore,
430+
K = &dyn lightning::util::persist::KVStore,
431+
TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider,
432+
TP = &dyn lightning_liquidity::utils::time::TimeProvider,
433+
> + Send
434+
+ Sync,
435+
>,
436+
> = None;
437+
438+
/// When initializing a background processor without a liquidity manager, this can be used to avoid
439+
/// specifying a concrete `LiquidityManagerSync` type.
440+
#[cfg(all(not(c_bindings), feature = "std"))]
441+
pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
442+
Arc<
443+
dyn ALiquidityManagerSync<
444+
EntropySource = dyn EntropySource,
445+
ES = &dyn EntropySource,
446+
NodeSigner = dyn lightning::sign::NodeSigner,
447+
NS = &dyn lightning::sign::NodeSigner,
448+
AChannelManager = DynChannelManager,
449+
CM = &DynChannelManager,
450+
Filter = dyn chain::Filter,
451+
C = &dyn chain::Filter,
452+
KVStoreSync = dyn lightning::util::persist::KVStoreSync,
453+
KS = &dyn lightning::util::persist::KVStoreSync,
427454
TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider,
428455
TP = &dyn lightning_liquidity::utils::time::TimeProvider,
429456
> + Send
@@ -544,45 +571,49 @@ pub(crate) mod futures_util {
544571
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
545572
}
546573

547-
enum JoinerResult<E, F: Future<Output = Result<(), E>> + Unpin> {
574+
enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
548575
Pending(Option<F>),
549-
Ready(Result<(), E>),
576+
Ready(Result<(), ERR>),
550577
}
551578

552579
pub(crate) struct Joiner<
553-
E,
554-
A: Future<Output = Result<(), E>> + Unpin,
555-
B: Future<Output = Result<(), E>> + Unpin,
556-
C: Future<Output = Result<(), E>> + Unpin,
557-
D: Future<Output = Result<(), E>> + Unpin,
580+
ERR,
581+
A: Future<Output = Result<(), ERR>> + Unpin,
582+
B: Future<Output = Result<(), ERR>> + Unpin,
583+
C: Future<Output = Result<(), ERR>> + Unpin,
584+
D: Future<Output = Result<(), ERR>> + Unpin,
585+
E: Future<Output = Result<(), ERR>> + Unpin,
558586
> {
559-
a: JoinerResult<E, A>,
560-
b: JoinerResult<E, B>,
561-
c: JoinerResult<E, C>,
562-
d: JoinerResult<E, D>,
587+
a: JoinerResult<ERR, A>,
588+
b: JoinerResult<ERR, B>,
589+
c: JoinerResult<ERR, C>,
590+
d: JoinerResult<ERR, D>,
591+
e: JoinerResult<ERR, E>,
563592
}
564593

565594
impl<
566-
E,
567-
A: Future<Output = Result<(), E>> + Unpin,
568-
B: Future<Output = Result<(), E>> + Unpin,
569-
C: Future<Output = Result<(), E>> + Unpin,
570-
D: Future<Output = Result<(), E>> + Unpin,
571-
> Joiner<E, A, B, C, D>
595+
ERR,
596+
A: Future<Output = Result<(), ERR>> + Unpin,
597+
B: Future<Output = Result<(), ERR>> + Unpin,
598+
C: Future<Output = Result<(), ERR>> + Unpin,
599+
D: Future<Output = Result<(), ERR>> + Unpin,
600+
E: Future<Output = Result<(), ERR>> + Unpin,
601+
> Joiner<ERR, A, B, C, D, E>
572602
{
573603
pub(crate) fn new() -> Self {
574604
Self {
575605
a: JoinerResult::Pending(None),
576606
b: JoinerResult::Pending(None),
577607
c: JoinerResult::Pending(None),
578608
d: JoinerResult::Pending(None),
609+
e: JoinerResult::Pending(None),
579610
}
580611
}
581612

582613
pub(crate) fn set_a(&mut self, fut: A) {
583614
self.a = JoinerResult::Pending(Some(fut));
584615
}
585-
pub(crate) fn set_a_res(&mut self, res: Result<(), E>) {
616+
pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
586617
self.a = JoinerResult::Ready(res);
587618
}
588619
pub(crate) fn set_b(&mut self, fut: B) {
@@ -594,19 +625,23 @@ pub(crate) mod futures_util {
594625
pub(crate) fn set_d(&mut self, fut: D) {
595626
self.d = JoinerResult::Pending(Some(fut));
596627
}
628+
pub(crate) fn set_e(&mut self, fut: E) {
629+
self.e = JoinerResult::Pending(Some(fut));
630+
}
597631
}
598632

599633
impl<
600-
E,
601-
A: Future<Output = Result<(), E>> + Unpin,
602-
B: Future<Output = Result<(), E>> + Unpin,
603-
C: Future<Output = Result<(), E>> + Unpin,
604-
D: Future<Output = Result<(), E>> + Unpin,
605-
> Future for Joiner<E, A, B, C, D>
634+
ERR,
635+
A: Future<Output = Result<(), ERR>> + Unpin,
636+
B: Future<Output = Result<(), ERR>> + Unpin,
637+
C: Future<Output = Result<(), ERR>> + Unpin,
638+
D: Future<Output = Result<(), ERR>> + Unpin,
639+
E: Future<Output = Result<(), ERR>> + Unpin,
640+
> Future for Joiner<ERR, A, B, C, D, E>
606641
where
607-
Joiner<E, A, B, C, D>: Unpin,
642+
Joiner<ERR, A, B, C, D, E>: Unpin,
608643
{
609-
type Output = [Result<(), E>; 4];
644+
type Output = [Result<(), ERR>; 5];
610645
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
611646
let mut all_complete = true;
612647
macro_rules! handle {
@@ -615,7 +650,7 @@ pub(crate) mod futures_util {
615650
JoinerResult::Pending(None) => {
616651
self.$val = JoinerResult::Ready(Ok(()));
617652
},
618-
JoinerResult::<E, _>::Pending(Some(ref mut val)) => {
653+
JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
619654
match Pin::new(val).poll(ctx) {
620655
Poll::Ready(res) => {
621656
self.$val = JoinerResult::Ready(res);
@@ -633,9 +668,10 @@ pub(crate) mod futures_util {
633668
handle!(b);
634669
handle!(c);
635670
handle!(d);
671+
handle!(e);
636672

637673
if all_complete {
638-
let mut res = [Ok(()), Ok(()), Ok(()), Ok(())];
674+
let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
639675
if let JoinerResult::Ready(ref mut val) = &mut self.a {
640676
core::mem::swap(&mut res[0], val);
641677
}
@@ -648,6 +684,9 @@ pub(crate) mod futures_util {
648684
if let JoinerResult::Ready(ref mut val) = &mut self.d {
649685
core::mem::swap(&mut res[3], val);
650686
}
687+
if let JoinerResult::Ready(ref mut val) = &mut self.e {
688+
core::mem::swap(&mut res[4], val);
689+
}
651690
Poll::Ready(res)
652691
} else {
653692
Poll::Pending
@@ -731,7 +770,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
731770
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
732771
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
733772
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
734-
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<DefaultTimeProvider>>;
773+
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc<DefaultTimeProvider>>;
735774
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
736775
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
737776
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
@@ -976,7 +1015,7 @@ where
9761015
OptionalSelector { optional_future: None }
9771016
};
9781017
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
979-
let fut = lm.get_lm().get_pending_msgs_future();
1018+
let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
9801019
OptionalSelector { optional_future: Some(fut) }
9811020
} else {
9821021
OptionalSelector { optional_future: None }
@@ -1179,6 +1218,17 @@ where
11791218
None => {},
11801219
}
11811220

1221+
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1222+
log_trace!(logger, "Persisting LiquidityManager...");
1223+
let fut = async {
1224+
liquidity_manager.get_lm().persist().await.map_err(|e| {
1225+
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1226+
e
1227+
})
1228+
};
1229+
futures.set_e(Box::pin(fut));
1230+
}
1231+
11821232
// Run persistence tasks in parallel and exit if any of them returns an error.
11831233
for res in futures.await {
11841234
res?;
@@ -1450,7 +1500,7 @@ impl BackgroundProcessor {
14501500
CM::Target: AChannelManager,
14511501
OM::Target: AOnionMessenger,
14521502
PM::Target: APeerManager,
1453-
LM::Target: ALiquidityManager,
1503+
LM::Target: ALiquidityManagerSync,
14541504
D::Target: ChangeDestinationSourceSync,
14551505
O::Target: 'static + OutputSpender,
14561506
K::Target: 'static + KVStoreSync,
@@ -1535,7 +1585,7 @@ impl BackgroundProcessor {
15351585
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
15361586
&chain_monitor.get_update_future(),
15371587
&om.get_om().get_update_future(),
1538-
&lm.get_lm().get_pending_msgs_future(),
1588+
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
15391589
),
15401590
(Some(om), None) => Sleeper::from_three_futures(
15411591
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
@@ -1545,7 +1595,7 @@ impl BackgroundProcessor {
15451595
(None, Some(lm)) => Sleeper::from_three_futures(
15461596
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
15471597
&chain_monitor.get_update_future(),
1548-
&lm.get_lm().get_pending_msgs_future(),
1598+
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
15491599
),
15501600
(None, None) => Sleeper::from_two_futures(
15511601
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
@@ -1579,6 +1629,13 @@ impl BackgroundProcessor {
15791629
log_trace!(logger, "Done persisting ChannelManager.");
15801630
}
15811631

1632+
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1633+
log_trace!(logger, "Persisting LiquidityManager...");
1634+
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1635+
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1636+
});
1637+
}
1638+
15821639
// Note that we want to run a graph prune once not long after startup before
15831640
// falling back to our usual hourly prunes. This avoids short-lived clients never
15841641
// pruning their network graph. We run once 60 seconds after startup before
@@ -1793,7 +1850,7 @@ mod tests {
17931850
use lightning::util::test_utils;
17941851
use lightning::{get_event, get_event_msg};
17951852
use lightning_liquidity::utils::time::DefaultTimeProvider;
1796-
use lightning_liquidity::LiquidityManager;
1853+
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManagerSync};
17971854
use lightning_persister::fs_store::FilesystemStore;
17981855
use lightning_rapid_gossip_sync::RapidGossipSync;
17991856
use std::collections::VecDeque;
@@ -1890,11 +1947,12 @@ mod tests {
18901947
IgnoringMessageHandler,
18911948
>;
18921949

1893-
type LM = LiquidityManager<
1950+
type LM = LiquidityManagerSync<
18941951
Arc<KeysManager>,
18951952
Arc<KeysManager>,
18961953
Arc<ChannelManager>,
18971954
Arc<dyn Filter + Sync + Send>,
1955+
Arc<Persister>,
18981956
Arc<DefaultTimeProvider>,
18991957
>;
19001958

@@ -2342,15 +2400,19 @@ mod tests {
23422400
Arc::clone(&logger),
23432401
Arc::clone(&keys_manager),
23442402
));
2345-
let liquidity_manager = Arc::new(LiquidityManager::new(
2346-
Arc::clone(&keys_manager),
2347-
Arc::clone(&keys_manager),
2348-
Arc::clone(&manager),
2349-
None,
2350-
None,
2351-
None,
2352-
None,
2353-
));
2403+
let liquidity_manager = Arc::new(
2404+
LiquidityManagerSync::new(
2405+
Arc::clone(&keys_manager),
2406+
Arc::clone(&keys_manager),
2407+
Arc::clone(&manager),
2408+
None,
2409+
None,
2410+
Arc::clone(&kv_store),
2411+
None,
2412+
None,
2413+
)
2414+
.unwrap(),
2415+
);
23542416
let node = Node {
23552417
node: manager,
23562418
p2p_gossip_sync,
@@ -2727,7 +2789,7 @@ mod tests {
27272789
Some(Arc::clone(&nodes[0].messenger)),
27282790
nodes[0].rapid_gossip_sync(),
27292791
Arc::clone(&nodes[0].peer_manager),
2730-
Some(Arc::clone(&nodes[0].liquidity_manager)),
2792+
Some(nodes[0].liquidity_manager.get_lm_async()),
27312793
Some(nodes[0].sweeper.sweeper_async()),
27322794
Arc::clone(&nodes[0].logger),
27332795
Some(Arc::clone(&nodes[0].scorer)),
@@ -3236,7 +3298,7 @@ mod tests {
32363298
Some(Arc::clone(&nodes[0].messenger)),
32373299
nodes[0].rapid_gossip_sync(),
32383300
Arc::clone(&nodes[0].peer_manager),
3239-
Some(Arc::clone(&nodes[0].liquidity_manager)),
3301+
Some(nodes[0].liquidity_manager.get_lm_async()),
32403302
Some(nodes[0].sweeper.sweeper_async()),
32413303
Arc::clone(&nodes[0].logger),
32423304
Some(Arc::clone(&nodes[0].scorer)),
@@ -3451,7 +3513,7 @@ mod tests {
34513513
Some(Arc::clone(&nodes[0].messenger)),
34523514
nodes[0].no_gossip_sync(),
34533515
Arc::clone(&nodes[0].peer_manager),
3454-
Some(Arc::clone(&nodes[0].liquidity_manager)),
3516+
Some(nodes[0].liquidity_manager.get_lm_async()),
34553517
Some(nodes[0].sweeper.sweeper_async()),
34563518
Arc::clone(&nodes[0].logger),
34573519
Some(Arc::clone(&nodes[0].scorer)),
@@ -3500,7 +3562,7 @@ mod tests {
35003562
crate::NO_ONION_MESSENGER,
35013563
nodes[0].no_gossip_sync(),
35023564
Arc::clone(&nodes[0].peer_manager),
3503-
crate::NO_LIQUIDITY_MANAGER,
3565+
crate::NO_LIQUIDITY_MANAGER_SYNC,
35043566
Some(Arc::clone(&nodes[0].sweeper)),
35053567
Arc::clone(&nodes[0].logger),
35063568
Some(Arc::clone(&nodes[0].scorer)),

lightning-liquidity/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ default = ["std", "time"]
1818
std = ["lightning/std"]
1919
time = ["std"]
2020
backtrace = ["dep:backtrace"]
21+
_test_utils = []
2122

2223
[dependencies]
2324
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
2425
lightning-types = { version = "0.3.0", path = "../lightning-types", default-features = false }
2526
lightning-invoice = { version = "0.34.0", path = "../lightning-invoice", default-features = false, features = ["serde"] }
27+
lightning-macros = { version = "0.2", path = "../lightning-macros" }
2628

2729
bitcoin = { version = "0.32.2", default-features = false, features = ["serde"] }
2830

0 commit comments

Comments
 (0)