@@ -11,8 +11,10 @@ use std::{
1111
1212use anyhow:: { Context , Result , bail} ;
1313use byteorder:: { BE , ReadBytesExt , WriteBytesExt } ;
14+ use dashmap:: DashSet ;
1415use jiff:: Timestamp ;
1516use memmap2:: Mmap ;
17+ use nohash_hasher:: BuildNoHashHasher ;
1618use parking_lot:: { Mutex , RwLock } ;
1719
1820pub use crate :: compaction:: selector:: CompactConfig ;
@@ -104,15 +106,15 @@ struct TrackedStats {
104106
105107/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
106108/// using a single write batch. It allows for concurrent reads.
107- pub struct TurboPersistence < S : ParallelScheduler > {
109+ pub struct TurboPersistence < S : ParallelScheduler , const FAMILIES : usize > {
108110 parallel_scheduler : S ,
109111 /// The path to the directory where the database is stored
110112 path : PathBuf ,
111113 /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and
112114 /// no modification on the database is performed.
113115 read_only : bool ,
114116 /// The inner state of the database. Writing will update that.
115- inner : RwLock < Inner > ,
117+ inner : RwLock < Inner < FAMILIES > > ,
116118 /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent
117119 /// write operations.
118120 active_write_operation : AtomicBool ,
@@ -128,11 +130,15 @@ pub struct TurboPersistence<S: ParallelScheduler> {
128130}
129131
130132/// The inner state of the database.
131- struct Inner {
133+ struct Inner < const FAMILIES : usize > {
132134 /// The list of meta files in the database. This is used to derive the SST files.
133135 meta_files : Vec < MetaFile > ,
134136 /// The current sequence number for the database.
135137 current_sequence_number : u32 ,
138+ /// The in progress set of hashes of keys that have been accessed.
139+ /// It will be flushed onto disk (into a meta file) on next commit.
140+ /// It's a dashset to allow modification while only tracking a read lock on Inner.
141+ accessed_key_hashes : [ DashSet < u64 , BuildNoHashHasher < u64 > > ; FAMILIES ] ,
136142}
137143
138144pub struct CommitOptions {
@@ -145,7 +151,7 @@ pub struct CommitOptions {
145151 keys_written : u64 ,
146152}
147153
148- impl < S : ParallelScheduler + Default > TurboPersistence < S > {
154+ impl < S : ParallelScheduler + Default , const FAMILIES : usize > TurboPersistence < S , FAMILIES > {
149155 /// Open a TurboPersistence database at the given path.
150156 /// This will read the directory and might performance cleanup when the database was not closed
151157 /// properly. Cleanup only requires to read a few bytes from a few files and to delete
@@ -161,7 +167,7 @@ impl<S: ParallelScheduler + Default> TurboPersistence<S> {
161167 }
162168}
163169
164- impl < S : ParallelScheduler > TurboPersistence < S > {
170+ impl < S : ParallelScheduler , const FAMILIES : usize > TurboPersistence < S , FAMILIES > {
165171 fn new ( path : PathBuf , read_only : bool , parallel_scheduler : S ) -> Self {
166172 Self {
167173 parallel_scheduler,
@@ -170,6 +176,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
170176 inner : RwLock :: new ( Inner {
171177 meta_files : Vec :: new ( ) ,
172178 current_sequence_number : 0 ,
179+ accessed_key_hashes : [ ( ) ; FAMILIES ]
180+ . map ( |_| DashSet :: with_hasher ( BuildNoHashHasher :: default ( ) ) ) ,
173181 } ) ,
174182 active_write_operation : AtomicBool :: new ( false ) ,
175183 amqf_cache : AmqfCache :: with (
@@ -405,7 +413,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
405413 /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`].
406414 /// Note that the WriteBatch might start writing data to disk while it's filled up with data.
407415 /// This data will only become visible after the WriteBatch is committed.
408- pub fn write_batch < K : StoreKey + Send + Sync + ' static , const FAMILIES : usize > (
416+ pub fn write_batch < K : StoreKey + Send + Sync + ' static > (
409417 & self ,
410418 ) -> Result < WriteBatch < K , S , FAMILIES > > {
411419 if self . read_only {
@@ -443,7 +451,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
443451
444452 /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it
445453 /// visible to readers.
446- pub fn commit_write_batch < K : StoreKey + Send + Sync + ' static , const FAMILIES : usize > (
454+ pub fn commit_write_batch < K : StoreKey + Send + Sync + ' static > (
447455 & self ,
448456 mut write_batch : WriteBatch < K , S , FAMILIES > ,
449457 ) -> Result < ( ) > {
@@ -456,7 +464,27 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
456464 new_sst_files,
457465 new_blob_files,
458466 keys_written,
459- } = write_batch. finish ( ) ?;
467+ } = write_batch. finish ( |family| {
468+ let inner = self . inner . read ( ) ;
469+ let set = & inner. accessed_key_hashes [ family as usize ] ;
470+ // len is only a snapshot at that time and it can change while we create the filter.
471+ // So we give it 5% more space to make resizes less likely.
472+ let initial_capacity = set. len ( ) * 19 / 20 ;
473+ let mut amqf =
474+ qfilter:: Filter :: with_fingerprint_size ( initial_capacity as u64 , u64:: BITS as u8 )
475+ . unwrap ( ) ;
476+ // This drains items from the set. But due to concurrency it might not be empty
477+ // afterwards, but that's fine. It will be part of the next commit.
478+ set. retain ( |hash| {
479+ // Performance-wise it would usually be better to insert sorted fingerprints, but we
480+ // assume that hashes are equally distributed, which makes it unnecessary.
481+ // Good for cache locality is that we insert in the order of the dashset's buckets.
482+ amqf. insert_fingerprint ( false , * hash)
483+ . expect ( "Failed to insert fingerprint" ) ;
484+ false
485+ } ) ;
486+ amqf
487+ } ) ?;
460488 self . commit ( CommitOptions {
461489 new_meta_files,
462490 new_sst_files,
@@ -1178,24 +1206,27 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
11781206 self . stats . miss_amqf . fetch_add ( 1 , Ordering :: Relaxed ) ;
11791207 }
11801208 MetaLookupResult :: SstLookup ( result) => match result {
1181- SstLookupResult :: Found ( result) => match result {
1182- LookupValue :: Deleted => {
1183- #[ cfg( feature = "stats" ) ]
1184- self . stats . hits_deleted . fetch_add ( 1 , Ordering :: Relaxed ) ;
1185- return Ok ( None ) ;
1186- }
1187- LookupValue :: Slice { value } => {
1188- #[ cfg( feature = "stats" ) ]
1189- self . stats . hits_small . fetch_add ( 1 , Ordering :: Relaxed ) ;
1190- return Ok ( Some ( value) ) ;
1191- }
1192- LookupValue :: Blob { sequence_number } => {
1193- #[ cfg( feature = "stats" ) ]
1194- self . stats . hits_blob . fetch_add ( 1 , Ordering :: Relaxed ) ;
1195- let blob = self . read_blob ( sequence_number) ?;
1196- return Ok ( Some ( blob) ) ;
1209+ SstLookupResult :: Found ( result) => {
1210+ inner. accessed_key_hashes [ family] . insert ( hash) ;
1211+ match result {
1212+ LookupValue :: Deleted => {
1213+ #[ cfg( feature = "stats" ) ]
1214+ self . stats . hits_deleted . fetch_add ( 1 , Ordering :: Relaxed ) ;
1215+ return Ok ( None ) ;
1216+ }
1217+ LookupValue :: Slice { value } => {
1218+ #[ cfg( feature = "stats" ) ]
1219+ self . stats . hits_small . fetch_add ( 1 , Ordering :: Relaxed ) ;
1220+ return Ok ( Some ( value) ) ;
1221+ }
1222+ LookupValue :: Blob { sequence_number } => {
1223+ #[ cfg( feature = "stats" ) ]
1224+ self . stats . hits_blob . fetch_add ( 1 , Ordering :: Relaxed ) ;
1225+ let blob = self . read_blob ( sequence_number) ?;
1226+ return Ok ( Some ( blob) ) ;
1227+ }
11971228 }
1198- } ,
1229+ }
11991230 SstLookupResult :: NotFound => {
12001231 #[ cfg( feature = "stats" ) ]
12011232 self . stats . miss_key . fetch_add ( 1 , Ordering :: Relaxed ) ;
0 commit comments