Skip to content

Commit 58abfda

Browse files
authored
Compaction V1: Manual compaction (#383)
This implements manual compaction strategy. Since `delete` in rocksdb doesn't cleanup space, but rather adds more space(tombstones), we trigger manual compaction on required ranges. Ranges are estimated as follows: 1. take size of dbs, take num of slots in db 2. estimate average size per slot 3. calculate how much slots we need to delete to clean 10% 4. deduce range from available range to truncate(LedgerTruncator::available_truncation_range) & num of slots from 3. 5. delete & compact see `LedgerTruncator::estimate_truncation_range` for implementation <!-- greptile_comment --> ## Greptile Summary Implements manual compaction strategy for RocksDB to manage tombstones left by deletions, with size-based truncation and concurrent compaction across column families. - Added `compact_range_cf` methods to `Database`, `Rocks`, and `LedgerColumn` structs to support manual compaction with optional range parameters - Implemented size-based truncation in `LedgerTruncator` that estimates slot ranges to clean ~10% of ledger space - Added concurrent compaction for different column families using `JoinSet` with proper memtable flushing - Renamed `truncate_slots` to `delete_slot_range` for clarity and improved error handling - Added comprehensive tests including block writing, finality boundaries and 1GB database scenarios <!-- /greptile_comment -->
1 parent 03ad1a2 commit 58abfda

File tree

7 files changed

+293
-78
lines changed

7 files changed

+293
-78
lines changed

magicblock-api/src/magic_validator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,9 @@ impl MagicValidator {
757757

758758
// we have two memory mapped databases, flush them to disk before exitting
759759
self.bank.flush();
760-
self.ledger.flush();
760+
if let Err(err) = self.ledger.shutdown(false) {
761+
error!("Failed to shutdown ledger: {:?}", err);
762+
}
761763
}
762764

763765
pub fn join(self) {

magicblock-ledger/src/database/db.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl Database {
2525
pub fn open(
2626
path: &Path,
2727
options: LedgerOptions,
28-
) -> std::result::Result<Self, LedgerError> {
28+
) -> Result<Self, LedgerError> {
2929
let column_options = Arc::new(options.column_options.clone());
3030
let backend = Arc::new(Rocks::open(path, options)?);
3131

@@ -36,16 +36,13 @@ impl Database {
3636
})
3737
}
3838

39-
pub fn destroy(path: &Path) -> std::result::Result<(), LedgerError> {
39+
pub fn destroy(path: &Path) -> Result<(), LedgerError> {
4040
Rocks::destroy(path)?;
4141

4242
Ok(())
4343
}
4444

45-
pub fn get<C>(
46-
&self,
47-
key: C::Index,
48-
) -> std::result::Result<Option<C::Type>, LedgerError>
45+
pub fn get<C>(&self, key: C::Index) -> Result<Option<C::Type>, LedgerError>
4946
where
5047
C: TypedColumn + ColumnName,
5148
{
@@ -63,10 +60,7 @@ impl Database {
6360
pub fn iter<C>(
6461
&self,
6562
iterator_mode: IteratorMode<C::Index>,
66-
) -> std::result::Result<
67-
impl Iterator<Item = (C::Index, Box<[u8]>)> + '_,
68-
LedgerError,
69-
>
63+
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_, LedgerError>
7064
where
7165
C: Column + ColumnName,
7266
{
@@ -160,6 +154,21 @@ impl Database {
160154
)
161155
}
162156

157+
/// See [crate::database::rocks_db::Rocks::compact_range_cf] for documentation.
158+
pub fn compact_range_cf<C>(
159+
&self,
160+
from: Option<C::Index>,
161+
to: Option<C::Index>,
162+
) where
163+
C: Column + ColumnName,
164+
{
165+
self.backend.compact_range_cf(
166+
self.cf_handle::<C>(),
167+
from.map(|index| C::key(index)),
168+
to.map(|index| C::key(index)),
169+
)
170+
}
171+
163172
pub fn is_primary_access(&self) -> bool {
164173
self.backend.is_primary_access()
165174
}

magicblock-ledger/src/database/ledger_column.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,20 @@ where
246246
) {
247247
write_batch.delete_range_cf::<C>(self.handle(), from, to);
248248
}
249+
250+
/// See [crate::database::rocks_db::Rocks::compact_range_cf] for documentation.
251+
pub fn compact_range(&self, from: Option<C::Index>, to: Option<C::Index>) {
252+
self.backend.compact_range_cf(
253+
self.handle(),
254+
from.map(|index| C::key(index)),
255+
to.map(|index| C::key(index)),
256+
)
257+
}
258+
259+
/// See [crate::database::rocks_db::Rocks::flush_cf] for documentation.
260+
pub fn flush(&self) -> LedgerResult<()> {
261+
self.backend.flush_cf(self.handle())
262+
}
249263
}
250264

251265
impl<C> LedgerColumn<C>

magicblock-ledger/src/database/rocks_db.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::{fs, path::Path};
22

33
use rocksdb::{
4-
ColumnFamily, DBIterator, DBPinnableSlice, DBRawIterator,
5-
IteratorMode as RocksIteratorMode, LiveFile, Options,
6-
WriteBatch as RWriteBatch, DB,
4+
AsColumnFamilyRef, ColumnFamily, DBIterator, DBPinnableSlice,
5+
DBRawIterator, FlushOptions, IteratorMode as RocksIteratorMode, LiveFile,
6+
Options, WriteBatch as RWriteBatch, DB,
77
};
88

99
use super::{
@@ -115,6 +115,33 @@ impl Rocks {
115115
Ok(())
116116
}
117117

118+
/// Compacts keys in range \[`from`, `to`\].
119+
/// For leveled compaction style, all files containing keys in the given range
120+
/// are compacted to the last level containing files.
121+
/// https://github.com/facebook/rocksdb/wiki/Manual-Compaction#compactrange
122+
pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
123+
&self,
124+
cf: &ColumnFamily,
125+
from_key: Option<S>,
126+
to_key: Option<E>,
127+
) {
128+
self.db.compact_range_cf(cf, from_key, to_key)
129+
}
130+
131+
/// Flushes column family
132+
pub fn flush_cf(&self, cf: &ColumnFamily) -> LedgerResult<()> {
133+
Ok(self.db.flush_cf(cf)?)
134+
}
135+
136+
/// Flushed column families
137+
pub fn flush_cfs_opt(
138+
&self,
139+
cfs: &[&impl AsColumnFamilyRef],
140+
options: &FlushOptions,
141+
) -> LedgerResult<()> {
142+
Ok(self.db.flush_cfs_opt(cfs, options)?)
143+
}
144+
118145
pub fn iterator_cf<C>(
119146
&self,
120147
cf: &ColumnFamily,

magicblock-ledger/src/ledger_truncator.rs

Lines changed: 137 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
1-
use std::{cmp::min, ops::ControlFlow, sync::Arc, time::Duration};
1+
use std::{cmp::min, sync::Arc, time::Duration};
22

3-
use log::{error, warn};
3+
use log::{error, info, warn};
44
use magicblock_core::traits::FinalityProvider;
55
use tokio::{
6-
task::{JoinError, JoinHandle},
6+
task::{JoinError, JoinHandle, JoinSet},
77
time::interval,
88
};
99
use tokio_util::sync::CancellationToken;
1010

11-
use crate::{errors::LedgerResult, Ledger};
11+
use crate::{
12+
database::columns::{
13+
AddressSignatures, Blockhash, Blocktime, PerfSamples, SlotSignatures,
14+
Transaction, TransactionMemos, TransactionStatus,
15+
},
16+
errors::LedgerResult,
17+
Ledger,
18+
};
1219

1320
pub const DEFAULT_TRUNCATION_TIME_INTERVAL: Duration =
14-
Duration::from_secs(10 * 60);
21+
Duration::from_secs(2 * 60);
22+
const PERCENTAGE_TO_TRUNCATE: u8 = 10;
1523

1624
struct LedgerTrunctationWorker<T> {
1725
finality_provider: Arc<T>,
@@ -46,34 +54,62 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
4654
return;
4755
}
4856
_ = interval.tick() => {
49-
const TRUNCATE_TO_PERCENTAGE: u64 = 90;
50-
51-
match self.should_truncate() {
52-
Ok(true) => {
53-
if let Some((from_slot, to_slot)) = self.next_truncation_range() {
54-
let to_size = ( self.ledger_size / 100 ) * TRUNCATE_TO_PERCENTAGE;
55-
Self::truncate_to_size(&self.ledger, to_size, from_slot, to_slot);
56-
} else {
57-
warn!("Failed to get truncation range! Ledger size exceeded desired threshold");
58-
}
59-
},
60-
Ok(false) => (),
61-
Err(err) => error!("Failed to check truncation condition: {err}"),
57+
// Note: since we clean 10%, tomstones will take around 10% as well
58+
const FILLED_PERCENTAGE_LIMIT: u8 = 100 - PERCENTAGE_TO_TRUNCATE;
59+
60+
let current_size = match self.ledger.storage_size() {
61+
Ok(value) => value,
62+
Err(err) => {
63+
error!("Failed to check truncation condition: {err}");
64+
continue;
65+
}
66+
};
67+
68+
// Check if we should truncate
69+
if current_size < (self.ledger_size / 100) * FILLED_PERCENTAGE_LIMIT as u64 {
70+
continue;
71+
}
72+
73+
info!("Ledger size: {current_size}");
74+
match self.estimate_truncation_range(current_size) {
75+
Ok(Some((from_slot, to_slot))) => Self::truncate_slot_range(&self.ledger, from_slot, to_slot).await,
76+
Ok(None) => warn!("Could not estimate truncation range"),
77+
Err(err) => error!("Failed to estimate truncation range: {:?}", err),
6278
}
6379
}
6480
}
6581
}
6682
}
6783

68-
fn should_truncate(&self) -> LedgerResult<bool> {
69-
// Once size percentage reached, we start truncation
70-
const FILLED_PERCENTAGE_LIMIT: u64 = 98;
71-
Ok(self.ledger.storage_size()?
72-
>= (self.ledger_size / 100) * FILLED_PERCENTAGE_LIMIT)
84+
/// Returns range to truncate [from_slot, to_slot]
85+
fn estimate_truncation_range(
86+
&self,
87+
current_ledger_size: u64,
88+
) -> LedgerResult<Option<(u64, u64)>> {
89+
let (from_slot, to_slot) =
90+
if let Some(val) = self.available_truncation_range() {
91+
val
92+
} else {
93+
return Ok(None);
94+
};
95+
96+
let num_slots = self.ledger.count_blockhashes()?;
97+
if num_slots == 0 {
98+
info!("No slot were written yet. Nothing to truncate!");
99+
return Ok(None);
100+
}
101+
102+
let slot_size = current_ledger_size / num_slots as u64;
103+
let size_to_truncate =
104+
(current_ledger_size / 100) * PERCENTAGE_TO_TRUNCATE as u64;
105+
let num_slots_to_truncate = size_to_truncate / slot_size;
106+
107+
let to_slot = min(from_slot + num_slots_to_truncate, to_slot);
108+
Ok(Some((from_slot, to_slot)))
73109
}
74110

75111
/// Returns [from_slot, to_slot] range that's safe to truncate
76-
fn next_truncation_range(&self) -> Option<(u64, u64)> {
112+
fn available_truncation_range(&self) -> Option<(u64, u64)> {
77113
let lowest_cleanup_slot = self.ledger.get_lowest_cleanup_slot();
78114
let latest_final_slot = self.finality_provider.get_latest_final_slot();
79115

@@ -83,10 +119,16 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
83119
// This could not happen because of Truncator
84120
warn!("Slots after latest final slot have been truncated!");
85121
}
122+
123+
info!(
124+
"Lowest cleanup slot ge than latest final slot. {}, {}",
125+
lowest_cleanup_slot, latest_final_slot
126+
);
86127
return None;
87128
}
88-
// Nothing to clean
89-
if latest_final_slot - 1 == lowest_cleanup_slot {
129+
// Nothing to truncate
130+
if latest_final_slot == lowest_cleanup_slot + 1 {
131+
info!("Nothing to truncate");
90132
return None;
91133
}
92134

@@ -103,9 +145,8 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
103145

104146
/// Utility function for splitting truncation into smaller chunks
105147
/// Cleans slots [from_slot; to_slot] inclusive range
106-
pub fn truncate_to_size(
148+
pub async fn truncate_slot_range(
107149
ledger: &Arc<Ledger>,
108-
size: u64,
109150
from_slot: u64,
110151
to_slot: u64,
111152
) {
@@ -116,9 +157,13 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
116157
warn!("LedgerTruncator: Nani?");
117158
return;
118159
}
160+
161+
info!(
162+
"LedgerTruncator: truncating slot range [{from_slot}; {to_slot}]"
163+
);
119164
(from_slot..=to_slot)
120165
.step_by(SINGLE_TRUNCATION_LIMIT)
121-
.try_for_each(|cur_from_slot| {
166+
.for_each(|cur_from_slot| {
122167
let num_slots_to_truncate = min(
123168
to_slot - cur_from_slot + 1,
124169
SINGLE_TRUNCATION_LIMIT as u64,
@@ -127,30 +172,76 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
127172
cur_from_slot + num_slots_to_truncate - 1;
128173

129174
if let Err(err) =
130-
ledger.truncate_slots(cur_from_slot, truncate_to_slot)
175+
ledger.delete_slot_range(cur_from_slot, truncate_to_slot)
131176
{
132177
warn!(
133178
"Failed to truncate slots {}-{}: {}",
134179
cur_from_slot, truncate_to_slot, err
135180
);
136-
137-
return ControlFlow::Continue(());
138-
}
139-
140-
match ledger.storage_size() {
141-
Ok(current_size) => {
142-
if current_size <= size {
143-
ControlFlow::Break(())
144-
} else {
145-
ControlFlow::Continue(())
146-
}
147-
}
148-
Err(err) => {
149-
warn!("Failed to fetch Ledger size: {err}");
150-
ControlFlow::Continue(())
151-
}
152181
}
153182
});
183+
// Flush memtables with tombstones prior to compaction
184+
if let Err(err) = ledger.flush() {
185+
error!("Failed to flush ledger: {err}");
186+
}
187+
188+
Self::compact_slot_range(ledger, from_slot, to_slot).await;
189+
}
190+
191+
/// Synchronous utility function that triggers and awaits compaction on all the columns
192+
pub async fn compact_slot_range(
193+
ledger: &Arc<Ledger>,
194+
from_slot: u64,
195+
to_slot: u64,
196+
) {
197+
if to_slot < from_slot {
198+
warn!("LedgerTruncator: Nani2?");
199+
return;
200+
}
201+
202+
// Compaction can be run concurrently for different cf
203+
// but it utilizes rocksdb threads, in order not to drain
204+
// our tokio rt threads, we split the effort in just 3 tasks
205+
let mut join_set = JoinSet::new();
206+
join_set.spawn({
207+
let ledger = ledger.clone();
208+
async move {
209+
ledger.compact_slot_range_cf::<Blocktime>(
210+
Some(from_slot),
211+
Some(to_slot + 1),
212+
);
213+
ledger.compact_slot_range_cf::<Blockhash>(
214+
Some(from_slot),
215+
Some(to_slot + 1),
216+
);
217+
ledger.compact_slot_range_cf::<PerfSamples>(
218+
Some(from_slot),
219+
Some(to_slot + 1),
220+
);
221+
ledger.compact_slot_range_cf::<SlotSignatures>(
222+
Some((from_slot, u32::MIN)),
223+
Some((to_slot + 1, u32::MAX)),
224+
);
225+
}
226+
});
227+
228+
// Can not compact with specific range
229+
join_set.spawn({
230+
let ledger = ledger.clone();
231+
async move {
232+
ledger.compact_slot_range_cf::<TransactionStatus>(None, None);
233+
ledger.compact_slot_range_cf::<Transaction>(None, None);
234+
}
235+
});
236+
join_set.spawn({
237+
let ledger = ledger.clone();
238+
async move {
239+
ledger.compact_slot_range_cf::<TransactionMemos>(None, None);
240+
ledger.compact_slot_range_cf::<AddressSignatures>(None, None);
241+
}
242+
});
243+
244+
let _ = join_set.join_all().await;
154245
}
155246
}
156247

0 commit comments

Comments
 (0)