Skip to content

Commit 561da4c

Browse files
committed
Remove lazy flag from {KVStore,KVStoreSync}::remove
The utility of the `lazy` flag was always not entirely clear mod some cloud environments where you actually could save an explicit call in some scenarios by batching the remove with subsequent calls. However, given the recent addition of the async `KVStore` introduced addtional ordering constraints its unclear how implementation could actually still benefit from the 'eventual' consistency properties originally envisioned. As the `lazy` flag then just amounts to a bunch of additonal complexity everywhere, we here simply drop it from the `KVStore`/`KVStoreSync` interfaces, simplifying implementations on both ends.
1 parent d076584 commit 561da4c

File tree

8 files changed

+102
-132
lines changed

8 files changed

+102
-132
lines changed

fuzz/src/fs_store.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
7878
Some(b) => b[0],
7979
None => break,
8080
};
81-
match v % 13 {
81+
match v % 12 {
8282
// Sync write
8383
0 => {
8484
let data_value = get_next_data_value();
@@ -96,8 +96,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
9696
},
9797
// Sync remove
9898
1 => {
99-
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key, false)
100-
.unwrap();
99+
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key).unwrap();
101100

102101
current_data = None;
103102
},
@@ -131,10 +130,8 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
131130
handles.push(handle);
132131
},
133132
// Async remove
134-
10 | 11 => {
135-
let lazy = v == 10;
136-
let fut =
137-
KVStore::remove(fs_store, primary_namespace, secondary_namespace, key, lazy);
133+
10 => {
134+
let fut = KVStore::remove(fs_store, primary_namespace, secondary_namespace, key);
138135

139136
// Already set the current_data, even though writing hasn't finished yet. This supports the call-time
140137
// ordering semantics.
@@ -144,7 +141,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
144141
handles.push(handle);
145142
},
146143
// Join tasks.
147-
12 => {
144+
11 => {
148145
for handle in handles.drain(..) {
149146
let _ = handle.await.unwrap();
150147
}

lightning-background-processor/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -736,14 +736,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
736736
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
737737
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
738738
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
739-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
739+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
740740
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
741741
/// # }
742742
/// # struct Store {}
743743
/// # impl lightning::util::persist::KVStore for Store {
744744
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
745745
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
746-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
746+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
747747
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
748748
/// # }
749749
/// # use core::time::Duration;
@@ -2135,9 +2135,9 @@ mod tests {
21352135
}
21362136

21372137
fn remove(
2138-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
2138+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
21392139
) -> lightning::io::Result<()> {
2140-
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
2140+
self.kv_store.remove(primary_namespace, secondary_namespace, key)
21412141
}
21422142

21432143
fn list(

lightning-liquidity/src/lsps2/service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1674,7 +1674,6 @@ where
16741674
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
16751675
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
16761676
&key,
1677-
true,
16781677
)
16791678
.await?;
16801679
}

lightning-liquidity/src/lsps5/service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ where
272272
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
273273
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
274274
&key,
275-
true,
276275
)
277276
.await?;
278277
}

lightning-persister/src/fs_store.rs

Lines changed: 68 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl KVStoreSync for FilesystemStore {
125125
}
126126

127127
fn remove(
128-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
128+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
129129
) -> Result<(), lightning::io::Error> {
130130
let path = self.inner.get_checked_dest_file_path(
131131
primary_namespace,
@@ -134,7 +134,7 @@ impl KVStoreSync for FilesystemStore {
134134
"remove",
135135
)?;
136136
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
137-
self.inner.remove_version(inner_lock_ref, path, lazy, version)
137+
self.inner.remove_version(inner_lock_ref, path, version)
138138
}
139139

140140
fn list(
@@ -334,81 +334,76 @@ impl FilesystemStoreInner {
334334
}
335335

336336
fn remove_version(
337-
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, lazy: bool, version: u64,
337+
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, version: u64,
338338
) -> lightning::io::Result<()> {
339339
self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || {
340340
if !dest_file_path.is_file() {
341341
return Ok(());
342342
}
343343

344-
if lazy {
345-
// If we're lazy we just call remove and be done with it.
344+
// We try our best to persist the updated metadata to ensure
345+
// atomicity of this call.
346+
#[cfg(not(target_os = "windows"))]
347+
{
346348
fs::remove_file(&dest_file_path)?;
347-
} else {
348-
// If we're not lazy we try our best to persist the updated metadata to ensure
349-
// atomicity of this call.
350-
#[cfg(not(target_os = "windows"))]
351-
{
352-
fs::remove_file(&dest_file_path)?;
353349

354-
let parent_directory = dest_file_path.parent().ok_or_else(|| {
355-
let msg = format!(
356-
"Could not retrieve parent directory of {}.",
357-
dest_file_path.display()
358-
);
359-
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
360-
})?;
361-
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
362-
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
363-
// to the inode might get cached (and hence possibly lost on crash), depending on
364-
// the target platform and file system.
365-
//
366-
// In order to assert we permanently removed the file in question we therefore
367-
// call `fsync` on the parent directory on platforms that support it.
368-
dir_file.sync_all()?;
369-
}
370-
371-
#[cfg(target_os = "windows")]
372-
{
373-
// Since Windows `DeleteFile` API is not persisted until the last open file handle
374-
// is dropped, and there seemingly is no reliable way to flush the directory
375-
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
376-
// file to be deleted to a temporary trash file and remove the latter file
377-
// afterwards.
378-
//
379-
// This should be marginally better, as, according to the documentation,
380-
// `MoveFileExW` APIs should offer stronger persistence guarantees,
381-
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
382-
// However, all this is partially based on assumptions and local experiments, as
383-
// Windows API is horribly underdocumented.
384-
let mut trash_file_path = dest_file_path.clone();
385-
let trash_file_ext =
386-
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
387-
trash_file_path.set_extension(trash_file_ext);
350+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
351+
let msg = format!(
352+
"Could not retrieve parent directory of {}.",
353+
dest_file_path.display()
354+
);
355+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
356+
})?;
357+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
358+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
359+
// to the inode might get cached (and hence possibly lost on crash), depending on
360+
// the target platform and file system.
361+
//
362+
// In order to assert we permanently removed the file in question we therefore
363+
// call `fsync` on the parent directory on platforms that support it.
364+
dir_file.sync_all()?;
365+
}
388366

389-
call!(unsafe {
390-
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
391-
path_to_windows_str(&dest_file_path).as_ptr(),
392-
path_to_windows_str(&trash_file_path).as_ptr(),
393-
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
367+
#[cfg(target_os = "windows")]
368+
{
369+
// Since Windows `DeleteFile` API is not persisted until the last open file handle
370+
// is dropped, and there seemingly is no reliable way to flush the directory
371+
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
372+
// file to be deleted to a temporary trash file and remove the latter file
373+
// afterwards.
374+
//
375+
// This should be marginally better, as, according to the documentation,
376+
// `MoveFileExW` APIs should offer stronger persistence guarantees,
377+
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
378+
// However, all this is partially based on assumptions and local experiments, as
379+
// Windows API is horribly underdocumented.
380+
let mut trash_file_path = dest_file_path.clone();
381+
let trash_file_ext =
382+
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
383+
trash_file_path.set_extension(trash_file_ext);
384+
385+
call!(unsafe {
386+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
387+
path_to_windows_str(&dest_file_path).as_ptr(),
388+
path_to_windows_str(&trash_file_path).as_ptr(),
389+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
394390
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
395-
)
396-
})?;
397-
398-
{
399-
// We fsync the trash file in hopes this will also flush the original's file
400-
// metadata to disk.
401-
let trash_file = fs::OpenOptions::new()
402-
.read(true)
403-
.write(true)
404-
.open(&trash_file_path.clone())?;
405-
trash_file.sync_all()?;
406-
}
391+
)
392+
})?;
407393

408-
// We're fine if this remove would fail as the trash file will be cleaned up in
409-
// list eventually.
410-
fs::remove_file(trash_file_path).ok();
394+
{
395+
// We fsync the trash file in hopes this will also flush the original's file
396+
// metadata to disk.
397+
let trash_file = fs::OpenOptions::new()
398+
.read(true)
399+
.write(true)
400+
.open(&trash_file_path.clone())?;
401+
trash_file.sync_all()?;
411402
}
403+
404+
// We're fine if this remove would fail as the trash file will be cleaned up in
405+
// list eventually.
406+
fs::remove_file(trash_file_path).ok();
412407
}
413408

414409
Ok(())
@@ -508,7 +503,7 @@ impl KVStore for FilesystemStore {
508503
}
509504

510505
fn remove(
511-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
506+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
512507
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
513508
let this = Arc::clone(&self.inner);
514509
let path = match this.get_checked_dest_file_path(
@@ -523,11 +518,11 @@ impl KVStore for FilesystemStore {
523518

524519
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
525520
Box::pin(async move {
526-
tokio::task::spawn_blocking(move || {
527-
this.remove_version(inner_lock_ref, path, lazy, version)
528-
})
529-
.await
530-
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
521+
tokio::task::spawn_blocking(move || this.remove_version(inner_lock_ref, path, version))
522+
.await
523+
.unwrap_or_else(|e| {
524+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
525+
})
531526
})
532527
}
533528

@@ -772,7 +767,7 @@ mod tests {
772767
let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1);
773768
assert_eq!(fs_store.state_size(), 1);
774769

775-
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false);
770+
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key);
776771
assert_eq!(fs_store.state_size(), 1);
777772

778773
let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
@@ -799,7 +794,7 @@ mod tests {
799794
assert_eq!(data2, &*read_data);
800795

801796
// Test remove.
802-
async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
797+
async_fs_store.remove(primary_namespace, secondary_namespace, key).await.unwrap();
803798

804799
let listed_keys =
805800
async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap();

lightning-persister/src/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
4040
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
4141
assert_eq!(data, &*read_data);
4242

43-
kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();
43+
kv_store.remove(primary_namespace, secondary_namespace, key).unwrap();
4444

4545
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
4646
assert_eq!(listed_keys.len(), 0);
@@ -57,7 +57,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
5757
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
5858
assert_eq!(data, &*read_data);
5959

60-
kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
60+
kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap();
6161

6262
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
6363
assert_eq!(listed_keys.len(), 0);

0 commit comments

Comments
 (0)