Skip to content

Commit a2f04be

Browse files
fix: use spawn_blocking instead of runtime tasks for compaction (#527)
<!-- greptile_comment --> ## Greptile Summary Updated On: 2025-09-04 05:20:29 UTC This PR refactors RocksDB compaction operations in the ledger truncation process to use `tokio::task::spawn_blocking` instead of async tasks. The changes address a critical performance issue where CPU-intensive RocksDB compaction operations were blocking the Tokio async runtime threads. **Key Changes:** 1. **In `ledger_truncator.rs`**: Replaces the coordinated async approach using `JoinSet` with three separate `spawn_blocking` calls. Previously, the code used `JoinSet` to run concurrent async tasks for compaction and awaited their completion. Now it spawns blocking tasks that run independently in dedicated thread pools. 2. **In `store/api.rs`**: Adds performance measurement instrumentation to the `compact_slot_range_cf` method, wrapping compaction operations with timing measurements and logging duration for operational visibility. The changes integrate with the existing RocksDB column family system defined in the ledger's database layer. The compaction operations target specific column families like `Blocktime`, `TransactionStatus`, `Transaction`, `TransactionMemos`, and `AddressSignatures`, which are part of the structured database schema for managing Solana transaction data. This refactoring follows a common async Rust pattern of moving blocking operations to dedicated thread pools to keep the main async runtime responsive for I/O-bound operations. ## Confidence score: 2/5 - This PR introduces a significant behavioral change that could cause race conditions or incomplete operations - Score reflects the removal of synchronization without adding proper coordination mechanisms - Pay close attention to `ledger_truncator.rs` as it removes critical awaiting of compaction completion <!-- /greptile_comment --> --------- Co-authored-by: Gabriele Picco <piccogabriele@gmail.com>
1 parent a34d8c5 commit a2f04be

File tree

3 files changed

+40
-60
lines changed

3 files changed

+40
-60
lines changed

magicblock-ledger/src/database/rocksdb_options.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@ pub fn get_rocksdb_options(access_type: &AccessType) -> Options {
99
options.create_if_missing(true);
1010
options.create_missing_column_families(true);
1111

12-
// Per the docs, a good value for this is the number of cores on the machine
13-
options.increase_parallelism(num_cpus::get() as i32);
14-
1512
// Background thread prioritization: give flushes more threads, limit compaction threads (low-priority)
1613
let mut env = rocksdb::Env::new().unwrap();
1714
let cpus_env = num_cpus::get() as i32;
18-
// Low-priority threads are used for compaction. Keep them small to favor foreground writes.
19-
let low_pri = (cpus_env / 4).clamp(1, 2);
20-
env.set_background_threads(low_pri);
15+
16+
// Bottom-priority are used for bottommost compactions. Keep it minimal - 1
17+
let bottom_pri = 1;
18+
env.set_bottom_priority_background_threads(bottom_pri);
2119
// High-priority threads are used for flush. Keep a few to avoid memtable flush backlog.
2220
let high_pri = cpus_env.clamp(2, 4);
2321
env.set_high_priority_background_threads(high_pri);
2422
options.set_env(&env);
2523

24+
// For every job RocksDB picks a thread from available pools
25+
// Here we select ceiling so RocksDB doesn't use all of HIGH + LOW + BOTTOM threads
26+
// By default num of threads in LOW is 1
27+
let max_jobs = std::cmp::max(high_pri + 1, bottom_pri);
28+
options.set_max_background_jobs(max_jobs);
29+
2630
// Bound WAL size
2731
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
2832

@@ -44,13 +48,6 @@ pub fn get_rocksdb_options(access_type: &AccessType) -> Options {
4448
options.set_enable_pipelined_write(true);
4549
options.set_enable_write_thread_adaptive_yield(true);
4650

47-
// Background jobs: enough to keep up, not to starve CPU
48-
// Cap at 8 or number of CPUs, whichever is smaller but at least 4
49-
let cpus = num_cpus::get() as i32;
50-
let max_jobs = cpus.clamp(4, 8);
51-
options.set_max_background_jobs(max_jobs);
52-
options.set_max_subcompactions(2);
53-
5451
// Use direct IO for compaction/flush to avoid page cache contention
5552
options.set_use_direct_reads(true);
5653
options.set_use_direct_io_for_flush_and_compaction(true);
@@ -61,10 +58,6 @@ pub fn get_rocksdb_options(access_type: &AccessType) -> Options {
6158
// Start with a conservative 128 MiB/s, adjustable via config later if needed
6259
options.set_ratelimiter(128 * 1024 * 1024, 100 * 1000, 10);
6360

64-
// Prevent large compactions from monopolizing resources
65-
options.set_soft_pending_compaction_bytes_limit(8 * 1024 * 1024 * 1024); // 8 GiB
66-
options.set_hard_pending_compaction_bytes_limit(32 * 1024 * 1024 * 1024); // 32 GiB
67-
6861
// Dynamic level bytes is a good default to balance levels
6962
options.set_level_compaction_dynamic_level_bytes(true);
7063
options.set_report_bg_io_stats(true);

magicblock-ledger/src/ledger_truncator.rs

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ use std::{cmp::min, sync::Arc, time::Duration};
22

33
use log::{error, info, warn};
44
use magicblock_core::traits::FinalityProvider;
5-
use solana_measure::measure::Measure;
65
use tokio::{
7-
task::{JoinError, JoinHandle, JoinSet},
6+
task::{spawn_blocking, JoinError, JoinHandle},
87
time::interval,
98
};
109
use tokio_util::sync::CancellationToken;
@@ -281,49 +280,33 @@ impl<T: FinalityProvider> LedgerTrunctationWorker<T> {
281280
// Compaction can be run concurrently for different cf
282281
// but it utilizes rocksdb threads, in order not to drain
283282
// our tokio rt threads, we split the effort in just 3 tasks
284-
let mut measure = Measure::start("Manual compaction");
285-
let mut join_set = JoinSet::new();
286-
join_set.spawn({
287-
let ledger = ledger.clone();
288-
async move {
289-
ledger.compact_slot_range_cf::<Blocktime>(
290-
Some(from_slot),
291-
Some(to_slot + 1),
292-
);
293-
ledger.compact_slot_range_cf::<Blockhash>(
294-
Some(from_slot),
295-
Some(to_slot + 1),
296-
);
297-
ledger.compact_slot_range_cf::<PerfSamples>(
298-
Some(from_slot),
299-
Some(to_slot + 1),
300-
);
301-
ledger.compact_slot_range_cf::<SlotSignatures>(
302-
Some((from_slot, u32::MIN)),
303-
Some((to_slot + 1, u32::MAX)),
304-
);
305-
}
306-
});
283+
let ledger_copy = ledger.clone();
284+
let handler = spawn_blocking(move || {
285+
ledger_copy.compact_slot_range_cf::<Blocktime>(
286+
Some(from_slot),
287+
Some(to_slot + 1),
288+
);
289+
ledger_copy.compact_slot_range_cf::<Blockhash>(
290+
Some(from_slot),
291+
Some(to_slot + 1),
292+
);
293+
ledger_copy.compact_slot_range_cf::<PerfSamples>(
294+
Some(from_slot),
295+
Some(to_slot + 1),
296+
);
297+
ledger_copy.compact_slot_range_cf::<SlotSignatures>(
298+
Some((from_slot, u32::MIN)),
299+
Some((to_slot + 1, u32::MAX)),
300+
);
307301

308-
// Can not compact with specific range
309-
join_set.spawn({
310-
let ledger = ledger.clone();
311-
async move {
312-
ledger.compact_slot_range_cf::<TransactionStatus>(None, None);
313-
ledger.compact_slot_range_cf::<Transaction>(None, None);
314-
}
302+
ledger_copy.compact_slot_range_cf::<TransactionStatus>(None, None);
303+
ledger_copy.compact_slot_range_cf::<Transaction>(None, None);
304+
ledger_copy.compact_slot_range_cf::<TransactionMemos>(None, None);
305+
ledger_copy.compact_slot_range_cf::<AddressSignatures>(None, None);
315306
});
316-
join_set.spawn({
317-
let ledger = ledger.clone();
318-
async move {
319-
ledger.compact_slot_range_cf::<TransactionMemos>(None, None);
320-
ledger.compact_slot_range_cf::<AddressSignatures>(None, None);
321-
}
322-
});
323-
324-
let _ = join_set.join_all().await;
325-
measure.stop();
326-
info!("Manual compaction took: {measure}");
307+
if let Err(err) = handler.await {
308+
error!("compaction aborted {}", err);
309+
}
327310
}
328311
}
329312

magicblock-ledger/src/store/api.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,11 @@ impl Ledger {
12761276
from: Option<C::Index>,
12771277
to: Option<C::Index>,
12781278
) {
1279+
let mut measure = Measure::start("compaction");
12791280
self.db.column::<C>().compact_range(from, to);
1281+
measure.stop();
1282+
1283+
info!("Compaction of column {} took: {}", C::NAME, measure);
12801284
}
12811285

12821286
/// Flushes all columns

0 commit comments

Comments
 (0)