Skip to content

Commit 1127ab0

Browse files
committed
sweeper/bg proc fixes wip
1 parent 436e526 commit 1127ab0

File tree

3 files changed

+99
-94
lines changed

3 files changed

+99
-94
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ macro_rules! define_run_body {
375375

376376
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377377
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
378+
$persister.persist_manager(&$channel_manager).await?;
379379
log_trace!($logger, "Done persisting ChannelManager.");
380380
}
381381
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +436,7 @@ macro_rules! define_run_body {
436436
log_trace!($logger, "Persisting network graph.");
437437
}
438438

439-
if let Err(e) = $persister.persist_graph(network_graph) {
439+
if let Err(e) = $persister.persist_graph(network_graph).await {
440440
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441441
}
442442

@@ -464,7 +464,7 @@ macro_rules! define_run_body {
464464
} else {
465465
log_trace!($logger, "Persisting scorer");
466466
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
467+
if let Err(e) = $persister.persist_scorer(&scorer).await {
468468
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469469
}
470470
}
@@ -487,16 +487,16 @@ macro_rules! define_run_body {
487487
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488488
// some races where users quit while channel updates were in-flight, with
489489
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
490+
$persister.persist_manager(&$channel_manager).await?;
491491

492492
// Persist Scorer on exit
493493
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
494+
$persister.persist_scorer(&scorer).await?;
495495
}
496496

497497
// Persist NetworkGraph on exit
498498
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
499+
$persister.persist_graph(network_graph).await?;
500500
}
501501

502502
Ok(())
@@ -840,7 +840,7 @@ where
840840
if let Some(duration_since_epoch) = fetch_time() {
841841
if update_scorer(scorer, &event, duration_since_epoch) {
842842
log_trace!(logger, "Persisting scorer after update");
843-
if let Err(e) = persister.persist_scorer(&*scorer) {
843+
if let Err(e) = persister.persist_scorer(&*scorer).await {
844844
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
845845
// We opt not to abort early on persistence failure here as persisting
846846
// the scorer is non-critical and we still hope that it will have
@@ -1033,7 +1033,7 @@ impl BackgroundProcessor {
10331033
.expect("Time should be sometime after 1970");
10341034
if update_scorer(scorer, &event, duration_since_epoch) {
10351035
log_trace!(logger, "Persisting scorer after update");
1036-
if let Err(e) = persister.persist_scorer(&scorer) {
1036+
if let Err(e) = persister.persist_scorer(&scorer).await {
10371037
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
10381038
}
10391039
}

lightning-persister/src/fs_store.rs

Lines changed: 76 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -119,91 +119,102 @@ impl KVStore for FilesystemStore {
119119
Ok(buf)
120120
}
121121

122-
fn write(
122+
fn write_async(
123123
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
124-
) -> lightning::io::Result<()> {
125-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
126-
127-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
128-
dest_file_path.push(key);
129-
130-
let parent_directory = dest_file_path.parent().ok_or_else(|| {
131-
let msg =
132-
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
133-
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
134-
})?;
135-
fs::create_dir_all(&parent_directory)?;
136-
137-
// Do a crazy dance with lots of fsync()s to be overly cautious here...
138-
// We never want to end up in a state where we've lost the old data, or end up using the
139-
// old data on power loss after we've returned.
140-
// The way to atomically write a file on Unix platforms is:
141-
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
142-
let mut tmp_file_path = dest_file_path.clone();
143-
let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
144-
tmp_file_path.set_extension(tmp_file_ext);
145-
146-
{
147-
let mut tmp_file = fs::File::create(&tmp_file_path)?;
148-
tmp_file.write_all(&buf)?;
149-
tmp_file.sync_all()?;
150-
}
151-
152-
let res = {
153-
let inner_lock_ref = {
154-
let mut outer_lock = self.locks.lock().unwrap();
155-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
156-
};
157-
let _guard = inner_lock_ref.write().unwrap();
124+
) -> AsyncResultType<'static, (), lightning::io::Error> {
125+
Box::pin(async move {
126+
check_namespace_key_validity(
127+
primary_namespace,
128+
secondary_namespace,
129+
Some(key),
130+
"write",
131+
)?;
132+
133+
let mut dest_file_path =
134+
self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
135+
dest_file_path.push(key);
136+
137+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
138+
let msg =
139+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
140+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
141+
})?;
142+
fs::create_dir_all(&parent_directory)?;
143+
144+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
145+
// We never want to end up in a state where we've lost the old data, or end up using the
146+
// old data on power loss after we've returned.
147+
// The way to atomically write a file on Unix platforms is:
148+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
149+
let mut tmp_file_path = dest_file_path.clone();
150+
let tmp_file_ext =
151+
format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
152+
tmp_file_path.set_extension(tmp_file_ext);
158153

159-
#[cfg(not(target_os = "windows"))]
160154
{
161-
fs::rename(&tmp_file_path, &dest_file_path)?;
162-
let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
163-
dir_file.sync_all()?;
164-
Ok(())
155+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
156+
tmp_file.write_all(&buf)?;
157+
tmp_file.sync_all()?;
165158
}
166159

167-
#[cfg(target_os = "windows")]
168-
{
169-
let res = if dest_file_path.exists() {
170-
call!(unsafe {
171-
windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
160+
let res = {
161+
let inner_lock_ref = {
162+
let mut outer_lock = self.locks.lock().unwrap();
163+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
164+
};
165+
let _guard = inner_lock_ref.write().unwrap();
166+
167+
#[cfg(not(target_os = "windows"))]
168+
{
169+
fs::rename(&tmp_file_path, &dest_file_path)?;
170+
let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
171+
dir_file.sync_all()?;
172+
Ok(())
173+
}
174+
175+
#[cfg(target_os = "windows")]
176+
{
177+
let res = if dest_file_path.exists() {
178+
call!(unsafe {
179+
windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
172180
path_to_windows_str(&dest_file_path).as_ptr(),
173181
path_to_windows_str(&tmp_file_path).as_ptr(),
174182
std::ptr::null(),
175183
windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
176184
std::ptr::null_mut() as *const core::ffi::c_void,
177185
std::ptr::null_mut() as *const core::ffi::c_void,
178186
)
179-
})
180-
} else {
181-
call!(unsafe {
182-
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
187+
})
188+
} else {
189+
call!(unsafe {
190+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
183191
path_to_windows_str(&tmp_file_path).as_ptr(),
184192
path_to_windows_str(&dest_file_path).as_ptr(),
185193
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
186194
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
187195
)
188-
})
189-
};
190-
191-
match res {
192-
Ok(()) => {
193-
// We fsync the dest file in hopes this will also flush the metadata to disk.
194-
let dest_file =
195-
fs::OpenOptions::new().read(true).write(true).open(&dest_file_path)?;
196-
dest_file.sync_all()?;
197-
Ok(())
198-
},
199-
Err(e) => Err(e.into()),
196+
})
197+
};
198+
199+
match res {
200+
Ok(()) => {
201+
// We fsync the dest file in hopes this will also flush the metadata to disk.
202+
let dest_file = fs::OpenOptions::new()
203+
.read(true)
204+
.write(true)
205+
.open(&dest_file_path)?;
206+
dest_file.sync_all()?;
207+
Ok(())
208+
},
209+
Err(e) => Err(e.into()),
210+
}
200211
}
201-
}
202-
};
212+
};
203213

204-
self.garbage_collect_locks();
214+
self.garbage_collect_locks();
205215

206-
res
216+
res
217+
})
207218
}
208219

209220
fn remove(
@@ -330,12 +341,6 @@ impl KVStore for FilesystemStore {
330341

331342
Ok(keys)
332343
}
333-
334-
fn write_async(
335-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
336-
) -> AsyncResultType<'static, (), lightning::io::Error> {
337-
todo!()
338-
}
339344
}
340345

341346
fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {

lightning/src/util/sweep.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -675,9 +675,9 @@ where
675675
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
676676
self.best_block_updated_internal(&mut *state_lock, header, height);
677677

678-
let _ = self.persist_state(&*state_lock).map_err(|e| {
679-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
680-
});
678+
// let _ = self.persist_state(&*state_lock).map_err(|e| {
679+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
680+
// });
681681
}
682682

683683
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -699,9 +699,9 @@ where
699699
}
700700
}
701701

702-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
703-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
704-
});
702+
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
703+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
704+
// });
705705
}
706706
}
707707

@@ -721,9 +721,9 @@ where
721721
) {
722722
let mut state_lock = self.sweeper_state.lock().unwrap();
723723
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
724-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
725-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
726-
});
724+
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
725+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
726+
// });
727727
}
728728

729729
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -744,18 +744,18 @@ where
744744
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
745745
.for_each(|o| o.status.unconfirmed());
746746

747-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
748-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
749-
});
747+
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
748+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
749+
// });
750750
}
751751
}
752752

753753
fn best_block_updated(&self, header: &Header, height: u32) {
754754
let mut state_lock = self.sweeper_state.lock().unwrap();
755755
self.best_block_updated_internal(&mut *state_lock, header, height);
756-
let _ = self.persist_state(&*state_lock).map_err(|e| {
757-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
758-
});
756+
// let _ = self.persist_state(&*state_lock).map_err(|e| {
757+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
758+
// });
759759
}
760760

761761
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {

0 commit comments

Comments
 (0)