From da43760eb9cf4db92d854f1d409246b3e2c4f8dc Mon Sep 17 00:00:00 2001 From: zhyass Date: Fri, 17 Oct 2025 15:56:24 +0800 Subject: [PATCH 1/4] chore: compact block limit check --- src/query/service/src/interpreters/hook/compact_hook.rs | 2 +- .../src/operations/mutation/mutator/block_compact_mutator.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 26f68d3f49313..4a95619d95c58 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -100,7 +100,7 @@ async fn do_hook_compact( return Ok(()); } CompactionLimits { - segment_limit: None, + segment_limit: Some(compaction_num_block_hint as usize), block_limit: Some(compaction_num_block_hint as usize), } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 769140540786a..d58a87afbd842 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -362,7 +362,7 @@ impl SegmentCompactChecker { self.compacted_segment_cnt += segments.len(); self.compacted_block_cnt += segments .iter() - .map(|(_, info)| info.summary.block_count) + .map(|(_, info)| (info.summary.block_count - info.summary.perfect_block_count)) .sum::(); true } @@ -420,7 +420,7 @@ impl SegmentCompactChecker { let residual_block_cnt: u64 = self .segments .iter() - .map(|(_, info)| info.summary.block_count) + .map(|(_, info)| (info.summary.block_count - info.summary.perfect_block_count)) .sum(); self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit || self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64 From f634e8b00777bae4322c8f67ebebbb25269783a6 Mon Sep 17 00:00:00 2001 From: zhyass Date: Sat, 18 Oct 2025 12:17:56 +0800 Subject: [PATCH 2/4] add test --- .../09_0041_auto_compaction.test | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test index 51dfb9e866df6..7c9404369b6e0 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test @@ -108,3 +108,35 @@ select info:average_depth from clustering_information('i15760', 't1') statement ok drop table t1 all; + +#ISSUE 18859 +statement ok +create table t2(a int) row_per_block=5; + +statement ok +insert into t2 select number from numbers(2); + +statement ok +insert into t2 select number from numbers(12); + +statement ok +insert into t2 select number from numbers(2); + +query T +select block_count, row_count from fuse_segment('i15760', 't2'); +---- +1 2 +2 12 +1 2 + +statement ok +insert into t2 select number from numbers(2); + +# after auto compaction +query T +select block_count, row_count from fuse_segment('i15760', 't2'); +---- +3 18 + +statement ok +drop table t2 all; From 289627d1557ebca83c56d35a342bd1471a1eecab Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 20 Oct 2025 19:57:08 +0800 Subject: [PATCH 3/4] fix --- .../mutation/mutator/block_compact_mutator.rs | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index d58a87afbd842..8bf86b8caa143 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -311,6 +311,7 @@ pub struct SegmentCompactChecker { compacted_segment_cnt: usize, compacted_block_cnt: u64, + compacted_perfect_block_cnt: u64, } impl SegmentCompactChecker { @@ -320,8 +321,9 @@ impl SegmentCompactChecker { total_block_count: 0, thresholds, cluster_key_id, - compacted_block_cnt: 0, compacted_segment_cnt: 0, + compacted_block_cnt: 0, + compacted_perfect_block_cnt: 0, } } @@ -360,10 +362,13 @@ impl SegmentCompactChecker { } self.compacted_segment_cnt += segments.len(); - self.compacted_block_cnt += segments + (self.compacted_block_cnt, self.compacted_perfect_block_cnt) = segments .iter() - .map(|(_, info)| (info.summary.block_count - info.summary.perfect_block_count)) - .sum::(); + .map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count)) + .fold( + (self.compacted_block_cnt, self.compacted_perfect_block_cnt), + |(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p), + ); true } @@ -415,15 +420,29 @@ impl SegmentCompactChecker { self.generate_part(final_segments, parts); } + /// Check if compaction limit is reached. pub fn is_limit_reached(&self, num_segment_limit: usize, num_block_limit: usize) -> bool { - let residual_segment_cnt = self.segments.len(); - let residual_block_cnt: u64 = self + let (compacted_block_cnt, compacted_perfect_block_cnt) = self .segments .iter() - .map(|(_, info)| (info.summary.block_count - info.summary.perfect_block_count)) - .sum(); - self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit - || self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64 + .map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count)) + .fold( + (self.compacted_block_cnt, self.compacted_perfect_block_cnt), + |(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p), + ); + let compacted_imperfect_block_cnt = compacted_block_cnt - compacted_perfect_block_cnt; + // When the total number of *imperfect blocks* reaches (num_block_limit - 1) + // and total selected blocks reach num_block_limit, we trigger compaction. + // + // Because `compact_num_block_hint` was intentionally increased by +1 during its calculation, + // to slightly expand the compaction range and include previously un-compacted segments. + if compacted_imperfect_block_cnt >= num_block_limit.saturating_sub(1) as u64 + && compacted_block_cnt >= num_block_limit as u64 + { + return true; + } + + self.compacted_segment_cnt + self.segments.len() >= num_segment_limit } } From b86346bfa9c2f52f1627404f781c5884798f63df Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 23 Oct 2025 17:50:35 +0800 Subject: [PATCH 4/4] fix --- .../src/interpreters/hook/compact_hook.rs | 2 +- src/query/settings/src/settings_default.rs | 4 +- .../common/generators/append_generator.rs | 6 +- .../mutation/mutator/block_compact_mutator.rs | 85 ++++++++++++------- .../mutation/mutator/recluster_mutator.rs | 15 +++- 5 files changed, 71 insertions(+), 41 deletions(-) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 4a95619d95c58..26f68d3f49313 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -100,7 +100,7 @@ async fn do_hook_compact( return Ok(()); } CompactionLimits { - segment_limit: Some(compaction_num_block_hint as usize), + segment_limit: None, block_limit: Some(compaction_num_block_hint as usize), } } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 2a86432c10d9b..192cd39af4ada 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -947,8 +947,8 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=u64::MAX)), }), ("compact_max_block_selection", DefaultSettingValue { - value: UserSettingValue::UInt64(10000), - desc: "Limits the maximum number of blocks that can be selected during a compact operation.", + value: UserSettingValue::UInt64(1000), + desc: "Limits the maximum number of imperfect blocks that can be selected during a compact operation.", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(2..=u64::MAX)), diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 7381afa4ca691..037384237d3a0 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -207,14 +207,10 @@ impl SnapshotGenerator for AppendGenerator { // If imperfect_count is larger, SLIGHTLY increase the number of blocks // eligible for auto-compaction, this adjustment is intended to help reduce // fragmentation over time. - // - // To prevent the off-by-one mistake, we need to add 1 to it; - // this way, the potentially previously left non-compacted segment will - // also be included. let compact_num_block_hint = std::cmp::min( imperfect_count, (auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64, - ) + 1; + ); info!("set compact_num_block_hint to {compact_num_block_hint }"); self.ctx .set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint); diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 8bf86b8caa143..ab029f2ed6208 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -118,6 +118,7 @@ impl BlockCompactMutator { let mut segment_idx = 0; let mut is_end = false; + let mut stop_after_next = false; let mut parts = Vec::new(); let chunk_size = max_threads * 4; for chunk in segment_locations.chunks(chunk_size) { @@ -154,10 +155,25 @@ impl BlockCompactMutator { checker.generate_part(segments, &mut parts); } - if checker.is_limit_reached(num_segment_limit, num_block_limit) { + if stop_after_next { is_end = true; break; } + + match checker.is_limit_reached(num_segment_limit, num_block_limit) { + CompactLimitState::Continue => {} + CompactLimitState::ReachedBlockLimit => { + // When the block limit is reached, we allow one more iteration + // to include the next segment in compaction. + // This "+1" behavior ensures that previously un-compacted segments + // near the boundary are not skipped due to strict block counting. + stop_after_next = true; + } + CompactLimitState::ReachedSegmentLimit => { + is_end = true; + break; + } + } } // Status. @@ -303,6 +319,16 @@ impl BlockCompactMutator { } } +// CompactLimitState indicates the current compaction progress state. +pub enum CompactLimitState { + /// Continue collecting more segments and blocks. + Continue, + /// Hit the block threshold — take one more segment before stopping. + ReachedBlockLimit, + /// Hit the segment threshold — stop immediately. + ReachedSegmentLimit, +} + pub struct SegmentCompactChecker { thresholds: BlockThresholds, segments: Vec<(SegmentIndex, Arc)>, @@ -310,8 +336,7 @@ pub struct SegmentCompactChecker { cluster_key_id: Option, compacted_segment_cnt: usize, - compacted_block_cnt: u64, - compacted_perfect_block_cnt: u64, + compacted_imperfect_block_cnt: u64, } impl SegmentCompactChecker { @@ -322,8 +347,7 @@ impl SegmentCompactChecker { thresholds, cluster_key_id, compacted_segment_cnt: 0, - compacted_block_cnt: 0, - compacted_perfect_block_cnt: 0, + compacted_imperfect_block_cnt: 0, } } @@ -362,13 +386,10 @@ impl SegmentCompactChecker { } self.compacted_segment_cnt += segments.len(); - (self.compacted_block_cnt, self.compacted_perfect_block_cnt) = segments + self.compacted_imperfect_block_cnt += segments .iter() - .map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count)) - .fold( - (self.compacted_block_cnt, self.compacted_perfect_block_cnt), - |(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p), - ); + .map(|(_, info)| info.summary.block_count - info.summary.perfect_block_count) + .sum::(); true } @@ -421,28 +442,30 @@ impl SegmentCompactChecker { } /// Check if compaction limit is reached. - pub fn is_limit_reached(&self, num_segment_limit: usize, num_block_limit: usize) -> bool { - let (compacted_block_cnt, compacted_perfect_block_cnt) = self - .segments - .iter() - .map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count)) - .fold( - (self.compacted_block_cnt, self.compacted_perfect_block_cnt), - |(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p), - ); - let compacted_imperfect_block_cnt = compacted_block_cnt - compacted_perfect_block_cnt; - // When the total number of *imperfect blocks* reaches (num_block_limit - 1) - // and total selected blocks reach num_block_limit, we trigger compaction. - // - // Because `compact_num_block_hint` was intentionally increased by +1 during its calculation, - // to slightly expand the compaction range and include previously un-compacted segments. - if compacted_imperfect_block_cnt >= num_block_limit.saturating_sub(1) as u64 - && compacted_block_cnt >= num_block_limit as u64 - { - return true; + pub fn is_limit_reached( + &self, + num_segment_limit: usize, + num_block_limit: usize, + ) -> CompactLimitState { + // Stop immediately if the number of compacted segments reaches limit + if self.compacted_segment_cnt + self.segments.len() >= num_segment_limit { + return CompactLimitState::ReachedSegmentLimit; } - self.compacted_segment_cnt + self.segments.len() >= num_segment_limit + // Count the total number of imperfect blocks (those that still need compaction). + let compacted_imperfect_block_cnt = + self.segments + .iter() + .fold(self.compacted_imperfect_block_cnt, |mut acc, (_, info)| { + acc += info.summary.block_count - info.summary.perfect_block_count; + acc + }); + // If the imperfect block count exceeds the limit, signal "take one more". + if compacted_imperfect_block_cnt >= num_block_limit as u64 { + CompactLimitState::ReachedBlockLimit + } else { + CompactLimitState::Continue + } } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 31683958c8b44..4422a29bb2d6f 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -52,6 +52,7 @@ use opendal::Operator; use crate::io::MetaReaders; use crate::operations::common::BlockMetaIndex as BlockIndex; +use crate::operations::mutation::mutator::block_compact_mutator::CompactLimitState; use crate::operations::mutation::SegmentCompactChecker; use crate::operations::BlockCompactMutator; use crate::operations::CompactLazyPartInfo; @@ -413,7 +414,7 @@ impl ReclusterMutator { let mut parts = Vec::new(); let mut checker = SegmentCompactChecker::new(self.block_thresholds, Some(self.cluster_key_id)); - + let mut stop_after_next = false; for (loc, compact_segment) in compact_segments.into_iter() { recluster_blocks_count += compact_segment.summary.block_count; let segments_vec = checker.add(loc.segment_idx, compact_segment); @@ -421,9 +422,19 @@ impl ReclusterMutator { checker.generate_part(segments, &mut parts); } - if checker.is_limit_reached(num_segment_limit, num_block_limit) { + if stop_after_next { break; } + + match checker.is_limit_reached(num_segment_limit, num_block_limit) { + CompactLimitState::Continue => {} + CompactLimitState::ReachedBlockLimit => { + stop_after_next = true; + } + CompactLimitState::ReachedSegmentLimit => { + break; + } + } } // finalize the compaction. checker.finalize(&mut parts);