Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn do_hook_compact(
return Ok(());
}
CompactionLimits {
segment_limit: None,
segment_limit: Some(compaction_num_block_hint as usize),
block_limit: Some(compaction_num_block_hint as usize),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ pub struct SegmentCompactChecker {

compacted_segment_cnt: usize,
compacted_block_cnt: u64,
compacted_perfect_block_cnt: u64,
}

impl SegmentCompactChecker {
Expand All @@ -320,8 +321,9 @@ impl SegmentCompactChecker {
total_block_count: 0,
thresholds,
cluster_key_id,
compacted_block_cnt: 0,
compacted_segment_cnt: 0,
compacted_block_cnt: 0,
compacted_perfect_block_cnt: 0,
}
}

Expand Down Expand Up @@ -360,10 +362,13 @@ impl SegmentCompactChecker {
}

self.compacted_segment_cnt += segments.len();
self.compacted_block_cnt += segments
(self.compacted_block_cnt, self.compacted_perfect_block_cnt) = segments
.iter()
.map(|(_, info)| info.summary.block_count)
.sum::<u64>();
.map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count))
.fold(
(self.compacted_block_cnt, self.compacted_perfect_block_cnt),
|(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p),
);
true
}

Expand Down Expand Up @@ -415,15 +420,29 @@ impl SegmentCompactChecker {
self.generate_part(final_segments, parts);
}

/// Check if compaction limit is reached.
pub fn is_limit_reached(&self, num_segment_limit: usize, num_block_limit: usize) -> bool {
let residual_segment_cnt = self.segments.len();
let residual_block_cnt: u64 = self
let (compacted_block_cnt, compacted_perfect_block_cnt) = self
.segments
.iter()
.map(|(_, info)| info.summary.block_count)
.sum();
self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit
|| self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64
.map(|(_, info)| (info.summary.block_count, info.summary.perfect_block_count))
.fold(
(self.compacted_block_cnt, self.compacted_perfect_block_cnt),
|(b_sum, p_sum), (b, p)| (b_sum + b, p_sum + p),
);
let compacted_imperfect_block_cnt = compacted_block_cnt - compacted_perfect_block_cnt;
// When the total number of *imperfect blocks* reaches (num_block_limit - 1)
// and total selected blocks reach num_block_limit, we trigger compaction.
//
// Because `compact_num_block_hint` was intentionally increased by +1 during its calculation,
// to slightly expand the compaction range and include previously un-compacted segments.
if compacted_imperfect_block_cnt >= num_block_limit.saturating_sub(1) as u64
&& compacted_block_cnt >= num_block_limit as u64
{
return true;
}

self.compacted_segment_cnt + self.segments.len() >= num_segment_limit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,35 @@ select info:average_depth from clustering_information('i15760', 't1')

statement ok
drop table t1 all;

#ISSUE 18859
statement ok
create table t2(a int) row_per_block=5;

statement ok
insert into t2 select number from numbers(2);

statement ok
insert into t2 select number from numbers(12);

statement ok
insert into t2 select number from numbers(2);

query T
select block_count, row_count from fuse_segment('i15760', 't2');
----
1 2
2 12
1 2

statement ok
insert into t2 select number from numbers(2);

# after auto compaction
query T
select block_count, row_count from fuse_segment('i15760', 't2');
----
3 18

statement ok
drop table t2 all;