@@ -803,15 +803,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
803803 } )
804804 . collect :: < Vec < _ > > ( ) ;
805805
806- let families = ssts_with_ranges
807- . iter ( )
808- . map ( |s| s. range . family )
809- . max ( )
810- . unwrap ( ) as usize
811- + 1 ;
812-
813- let mut sst_by_family = Vec :: with_capacity ( families) ;
814- sst_by_family. resize_with ( families, Vec :: new) ;
806+ let mut sst_by_family = [ ( ) ; FAMILIES ] . map ( |_| Vec :: new ( ) ) ;
815807
816808 for sst in ssts_with_ranges {
817809 sst_by_family[ sst. range . family as usize ] . push ( sst) ;
@@ -846,6 +838,22 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
846838 } )
847839 . collect :: < Vec < _ > > ( ) ;
848840
841+ let mut used_key_hashes = [ ( ) ; FAMILIES ] . map ( |_| Vec :: new ( ) ) ;
842+
843+ {
844+ for & ( family, ..) in merge_jobs. iter ( ) {
845+ used_key_hashes[ family] . extend (
846+ meta_files
847+ . iter ( )
848+ . filter ( |m| m. family ( ) == family as u32 )
849+ . filter_map ( |meta_file| {
850+ meta_file. deserialize_used_key_hashes_amqf ( ) . transpose ( )
851+ } )
852+ . collect :: < Result < Vec < _ > > > ( ) ?,
853+ ) ;
854+ }
855+ }
856+
849857 let result = self
850858 . parallel_scheduler
851859 . parallel_map_collect_owned :: < _ , _ , Result < Vec < _ > > > (
@@ -979,110 +987,145 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
979987
980988 let mut keys_written = 0 ;
981989
982- let mut total_key_size = 0 ;
983- let mut total_value_size = 0 ;
984990 let mut current: Option < LookupEntry < ' _ > > = None ;
985- let mut entries = Vec :: new ( ) ;
986- let mut last_entries = Vec :: new ( ) ;
987- let mut last_entries_total_key_size = 0 ;
991+
992+ #[ derive( Default ) ]
993+ struct Collector < ' l > {
994+ entries : Vec < LookupEntry < ' l > > ,
995+ total_key_size : usize ,
996+ total_value_size : usize ,
997+ last_entries : Vec < LookupEntry < ' l > > ,
998+ last_entries_total_key_size : usize ,
999+ }
1000+ let mut used_collector = Collector :: default ( ) ;
1001+ let mut unused_collector = Collector :: default ( ) ;
9881002 for entry in iter {
9891003 let entry = entry?;
9901004
9911005 // Remove duplicates
9921006 if let Some ( current) = current. take ( ) {
9931007 if current. key != entry. key {
1008+ let is_used = used_key_hashes[ family as usize ]
1009+ . iter ( )
1010+ . any ( |amqf| amqf. contains ( current. hash ) ) ;
1011+ let collector = if is_used {
1012+ & mut used_collector
1013+ } else {
1014+ & mut unused_collector
1015+ } ;
9941016 let key_size = current. key . len ( ) ;
9951017 let value_size = current. value . uncompressed_size_in_sst ( ) ;
996- total_key_size += key_size;
997- total_value_size += value_size;
1018+ collector . total_key_size += key_size;
1019+ collector . total_value_size += value_size;
9981020
999- if total_key_size + total_value_size
1021+ if collector . total_key_size + collector . total_value_size
10001022 > DATA_THRESHOLD_PER_COMPACTED_FILE
1001- || entries. len ( ) >= MAX_ENTRIES_PER_COMPACTED_FILE
1023+ || collector. entries . len ( )
1024+ >= MAX_ENTRIES_PER_COMPACTED_FILE
10021025 {
10031026 let selected_total_key_size =
1004- last_entries_total_key_size;
1005- swap ( & mut entries, & mut last_entries) ;
1006- last_entries_total_key_size = total_key_size - key_size;
1007- total_key_size = key_size;
1008- total_value_size = value_size;
1009-
1010- if !entries. is_empty ( ) {
1027+ collector. last_entries_total_key_size ;
1028+ swap (
1029+ & mut collector. entries ,
1030+ & mut collector. last_entries ,
1031+ ) ;
1032+ collector. last_entries_total_key_size =
1033+ collector. total_key_size - key_size;
1034+ collector. total_key_size = key_size;
1035+ collector. total_value_size = value_size;
1036+
1037+ if !collector. entries . is_empty ( ) {
10111038 let seq = sequence_number
10121039 . fetch_add ( 1 , Ordering :: SeqCst )
10131040 + 1 ;
10141041
1015- keys_written += entries. len ( ) as u64 ;
1042+ keys_written += collector . entries . len ( ) as u64 ;
10161043 new_sst_files. push ( create_sst_file (
10171044 & self . parallel_scheduler ,
1018- & entries,
1045+ & collector . entries ,
10191046 selected_total_key_size,
10201047 path,
10211048 seq,
10221049 ) ?) ;
10231050
1024- entries. clear ( ) ;
1051+ collector . entries . clear ( ) ;
10251052 }
10261053 }
10271054
1028- entries. push ( current) ;
1055+ collector . entries . push ( current) ;
10291056 } else {
10301057 // Override value
10311058 }
10321059 }
10331060 current = Some ( entry) ;
10341061 }
10351062 if let Some ( entry) = current {
1036- total_key_size += entry. key . len ( ) ;
1063+ let is_used = used_key_hashes[ family as usize ]
1064+ . iter ( )
1065+ . any ( |amqf| amqf. contains ( entry. hash ) ) ;
1066+ let collector = if is_used {
1067+ & mut used_collector
1068+ } else {
1069+ & mut unused_collector
1070+ } ;
1071+
1072+ collector. total_key_size += entry. key . len ( ) ;
10371073 // Obsolete as we no longer need total_value_size
10381074 // total_value_size += entry.value.uncompressed_size_in_sst();
1039- entries. push ( entry) ;
1075+ collector . entries . push ( entry) ;
10401076 }
10411077
10421078 // If we have one set of entries left, write them to a new SST file
1043- if last_entries. is_empty ( ) && !entries. is_empty ( ) {
1044- let seq = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1045-
1046- keys_written += entries. len ( ) as u64 ;
1047- new_sst_files. push ( create_sst_file (
1048- & self . parallel_scheduler ,
1049- & entries,
1050- total_key_size,
1051- path,
1052- seq,
1053- ) ?) ;
1054- } else
1055- // If we have two sets of entries left, merge them and
1056- // split it into two SST files, to avoid having a
1057- // single SST file that is very small.
1058- if !last_entries. is_empty ( ) {
1059- last_entries. append ( & mut entries) ;
1060-
1061- last_entries_total_key_size += total_key_size;
1062-
1063- let ( part1, part2) = last_entries. split_at ( last_entries. len ( ) / 2 ) ;
1064-
1065- let seq1 = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1066- let seq2 = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1067-
1068- keys_written += part1. len ( ) as u64 ;
1069- new_sst_files. push ( create_sst_file (
1070- & self . parallel_scheduler ,
1071- part1,
1072- // We don't know the exact sizes so we estimate them
1073- last_entries_total_key_size / 2 ,
1074- path,
1075- seq1,
1076- ) ?) ;
1077-
1078- keys_written += part2. len ( ) as u64 ;
1079- new_sst_files. push ( create_sst_file (
1080- & self . parallel_scheduler ,
1081- part2,
1082- last_entries_total_key_size / 2 ,
1083- path,
1084- seq2,
1085- ) ?) ;
1079+ for collector in [ & mut used_collector, & mut unused_collector] {
1080+ if collector. last_entries . is_empty ( )
1081+ && !collector. entries . is_empty ( )
1082+ {
1083+ let seq = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1084+
1085+ keys_written += collector. entries . len ( ) as u64 ;
1086+ new_sst_files. push ( create_sst_file (
1087+ & self . parallel_scheduler ,
1088+ & collector. entries ,
1089+ collector. total_key_size ,
1090+ path,
1091+ seq,
1092+ ) ?) ;
1093+ } else
1094+ // If we have two sets of entries left, merge them and
1095+ // split it into two SST files, to avoid having a
1096+ // single SST file that is very small.
1097+ if !collector. last_entries . is_empty ( ) {
1098+ collector. last_entries . append ( & mut collector. entries ) ;
1099+
1100+ collector. last_entries_total_key_size +=
1101+ collector. total_key_size ;
1102+
1103+ let ( part1, part2) = collector
1104+ . last_entries
1105+ . split_at ( collector. last_entries . len ( ) / 2 ) ;
1106+
1107+ let seq1 = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1108+ let seq2 = sequence_number. fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
1109+
1110+ keys_written += part1. len ( ) as u64 ;
1111+ new_sst_files. push ( create_sst_file (
1112+ & self . parallel_scheduler ,
1113+ part1,
1114+ // We don't know the exact sizes so we estimate them
1115+ collector. last_entries_total_key_size / 2 ,
1116+ path,
1117+ seq1,
1118+ ) ?) ;
1119+
1120+ keys_written += part2. len ( ) as u64 ;
1121+ new_sst_files. push ( create_sst_file (
1122+ & self . parallel_scheduler ,
1123+ part2,
1124+ collector. last_entries_total_key_size / 2 ,
1125+ path,
1126+ seq2,
1127+ ) ?) ;
1128+ }
10861129 }
10871130 Ok ( PartialMergeResult :: Merged {
10881131 new_sst_files,
0 commit comments