Skip to content

Commit 8b9934f

Browse files
taco-pacobmuddhathlorenz
authored
Ledger purgatory (#332)
Implementation of service that will periodically purge database. It preserves slots after latest snapshots + some interval after , that depends on user configuration of `desired-size` field. Estimation function for interval is `EphemeralConfig::estimate_purge_slot_interval`. Using that interval, there will be a periodic cleaning of slots prior to `latest_snapshot_slot - interval ` once deserized size reached. Since implementation may change in the future, I added `FinalityProvider` that for now just provides `latest_snapshot_slot `, put may in the future be changed to provide latest final slot. <!-- greptile_comment --> ## Greptile Summary Implemented a ledger purging service called "Ledger Purgatory" that periodically cleans up old database slots while preserving recent history based on snapshot slots and configurable size limits. - Added `LedgerPurgatory` service in `/magicblock-ledger/src/ledger_purgatory.rs` that safely purges slots before `latest_snapshot_slot - slots_to_preserve` - Introduced `FinalityProvider` trait and implementation that currently uses latest snapshot slot but is extensible for future finality mechanisms - Added `desired_size` configuration in `LedgerConfig` to control purging interval based on estimated transaction sizes - Implemented chunked purging in `store/api.rs` to avoid overwhelming RocksDB when cleaning up large ranges of slots - Added comprehensive tests in `tests/ledger_purgatory.rs` covering various purging scenarios and edge cases The implementation appears sound but would benefit from additional documentation around the purging strategy and configuration options. <sub>💡 (2/5) Greptile learns from your feedback when you react with 👍/👎!</sub> <!-- /greptile_comment --> --------- Co-authored-by: Babur Makhmudov <bmuddha13@gmail.com> Co-authored-by: Thorsten Lorenz <thlorenz@gmx.de>
1 parent b351792 commit 8b9934f

File tree

31 files changed

+1014
-217
lines changed

31 files changed

+1014
-217
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

magicblock-accounts-db/src/lib.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ use solana_account::{
1111
};
1212
use solana_pubkey::Pubkey;
1313
use storage::AccountsStorage;
14+
15+
use crate::snapshot::SnapSlot;
16+
1417
pub type AdbResult<T> = Result<T, AccountsDbError>;
1518
/// Stop the World Lock, used to halt all writes to adb while
1619
/// some critical operation is in action, e.g. snapshotting
@@ -257,6 +260,42 @@ impl AccountsDb {
257260
}
258261
}
259262

263+
/// Returns slot of latest snapshot or None
264+
/// Parses path to extract slot
265+
pub fn get_latest_snapshot_slot(&self) -> Option<u64> {
266+
self.snapshot_engine
267+
.with_snapshots(|snapshots| -> Option<u64> {
268+
let latest_path = snapshots.back()?;
269+
SnapSlot::try_from_path(latest_path)
270+
.map(|snap_slot: SnapSlot| snap_slot.slot())
271+
.or_else(|| {
272+
error!(
273+
"Failed to parse the path into SnapSlot: {}",
274+
latest_path.display()
275+
);
276+
None
277+
})
278+
})
279+
}
280+
281+
/// Return slot of oldest maintained snapshot or None
282+
/// Parses path to extract slot
283+
pub fn get_oldest_snapshot_slot(&self) -> Option<u64> {
284+
self.snapshot_engine
285+
.with_snapshots(|snapshots| -> Option<u64> {
286+
let latest_path = snapshots.front()?;
287+
SnapSlot::try_from_path(latest_path)
288+
.map(|snap_slot: SnapSlot| snap_slot.slot())
289+
.or_else(|| {
290+
error!(
291+
"Failed to parse the path into SnapSlot: {}",
292+
latest_path.display()
293+
);
294+
None
295+
})
296+
})
297+
}
298+
260299
/// Checks whether AccountsDB has "freshness", not exceeding given slot
261300
/// Returns current slot if true, otherwise tries to rollback to the
262301
/// most recent snapshot, which is older than the provided slot

magicblock-accounts-db/src/snapshot.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ impl SnapshotEngine {
7070
Ok(())
7171
}
7272

73+
/// Provides read-only access to the internal snapshots queue.
74+
///
75+
/// Executes the given closure `f` with an immutable reference to the snapshots [`VecDeque`].
76+
/// This guarantees thread-safe access while preventing modification of the underlying data.
77+
pub(crate) fn with_snapshots<F, R>(&self, f: F) -> R
78+
where
79+
F: Fn(&VecDeque<PathBuf>) -> R,
80+
{
81+
let snapshots = self.snapshots.lock();
82+
f(&snapshots)
83+
}
84+
7385
/// Try to rollback to snapshot which is the most recent one before given slot
7486
///
7587
/// NOTE: In case of success, this deletes the primary
@@ -205,11 +217,11 @@ impl SnapshotEngine {
205217
}
206218

207219
#[derive(Eq, PartialEq, PartialOrd, Ord)]
208-
struct SnapSlot(u64);
220+
pub(crate) struct SnapSlot(u64);
209221

210222
impl SnapSlot {
211223
/// parse snapshot path to extract slot number
212-
fn try_from_path(path: &Path) -> Option<Self> {
224+
pub(crate) fn try_from_path(path: &Path) -> Option<Self> {
213225
path.file_name()
214226
.and_then(|s| s.to_str())
215227
.and_then(|s| s.split('-').nth(1))
@@ -221,6 +233,10 @@ impl SnapSlot {
221233
// enforce strict alphanumeric ordering by introducing extra padding
222234
ppath.join(format!("snapshot-{:0>12}", self.0))
223235
}
236+
237+
pub(crate) fn slot(&self) -> u64 {
238+
self.0
239+
}
224240
}
225241

226242
/// Conventional byte to byte recursive directory copy,

magicblock-accounts/src/external_accounts_manager.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ where
209209
)
210210
.map_err(Box::new)?;
211211
}
212-
213212
// Done
214213
Ok(signatures)
215214
}

magicblock-api/src/magic_validator.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ use magicblock_bank::{
3939
};
4040
use magicblock_config::{EphemeralConfig, ProgramConfig};
4141
use magicblock_geyser_plugin::rpc::GeyserRpcService;
42-
use magicblock_ledger::{blockstore_processor::process_ledger, Ledger};
42+
use magicblock_ledger::{
43+
blockstore_processor::process_ledger,
44+
ledger_truncator::{LedgerTruncator, DEFAULT_TRUNCATION_TIME_INTERVAL},
45+
Ledger,
46+
};
4347
use magicblock_metrics::MetricsService;
4448
use magicblock_perf_service::SamplePerformanceService;
4549
use magicblock_processor::execute_transaction::TRANSACTION_INDEX_LOCK;
@@ -114,6 +118,7 @@ pub struct MagicValidator {
114118
token: CancellationToken,
115119
bank: Arc<Bank>,
116120
ledger: Arc<Ledger>,
121+
ledger_truncator: LedgerTruncator<Bank>,
117122
slot_ticker: Option<tokio::task::JoinHandle<()>>,
118123
pubsub_handle: RwLock<Option<thread::JoinHandle<()>>>,
119124
pubsub_close_handle: PubsubServiceCloseHandle,
@@ -192,6 +197,13 @@ impl MagicValidator {
192197
ledger.get_max_blockhash().map(|(slot, _)| slot)?,
193198
)?;
194199

200+
let ledger_truncator = LedgerTruncator::new(
201+
ledger.clone(),
202+
bank.clone(),
203+
DEFAULT_TRUNCATION_TIME_INTERVAL,
204+
config.validator_config.ledger.size,
205+
);
206+
195207
fund_validator_identity(&bank, &validator_pubkey);
196208
fund_magic_context(&bank);
197209
let faucet_keypair = funded_faucet(
@@ -344,6 +356,7 @@ impl MagicValidator {
344356
token,
345357
bank,
346358
ledger,
359+
ledger_truncator,
347360
accounts_manager,
348361
transaction_listener,
349362
transaction_status_sender,
@@ -580,6 +593,8 @@ impl MagicValidator {
580593
self.start_remote_account_updates_worker();
581594
self.start_remote_account_cloner_worker().await?;
582595

596+
self.ledger_truncator.start();
597+
583598
self.rpc_service.start().map_err(|err| {
584599
ApiError::FailedToStartJsonRpcService(format!("{:?}", err))
585600
})?;
@@ -677,11 +692,13 @@ impl MagicValidator {
677692
Ok(())
678693
}
679694

680-
pub fn stop(&self) {
695+
pub fn stop(&mut self) {
681696
self.exit.store(true, Ordering::Relaxed);
682697
self.rpc_service.close();
683698
PubsubService::close(&self.pubsub_close_handle);
684699
self.token.cancel();
700+
self.ledger_truncator.stop();
701+
685702
// wait a bit for services to stop
686703
thread::sleep(Duration::from_secs(1));
687704

magicblock-bank/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rayon = { workspace = true, optional = true }
1717
serde = { workspace = true, features = ["rc"] }
1818
magicblock-accounts-db = { workspace = true }
1919
magicblock-program = { workspace = true }
20+
magicblock-core = { workspace = true }
2021
solana-accounts-db = { workspace = true }
2122
solana-address-lookup-table-program = { workspace = true }
2223
solana-bpf-loader-program = { workspace = true }

magicblock-bank/src/address_lookup_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl Bank {
3939
slot_hashes: &SlotHashes,
4040
) -> Result<LoadedAddresses, AddressLoaderError> {
4141
let table_account = self
42-
.adb
42+
.accounts_db
4343
.get_account(&table.account_key)
4444
.map(AccountSharedData::from)
4545
.map_err(|_| AddressLoaderError::LookupTableAccountNotFound)?;

magicblock-bank/src/bank.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use log::{debug, info, trace};
1717
use magicblock_accounts_db::{
1818
config::AccountsDbConfig, error::AccountsDbError, AccountsDb, StWLock,
1919
};
20+
use magicblock_core::traits::FinalityProvider;
2021
use solana_accounts_db::{
2122
accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
2223
blockhash_queue::BlockhashQueue,
@@ -149,7 +150,7 @@ impl ForkGraph for SimpleForkGraph {
149150
//#[derive(Debug)]
150151
pub struct Bank {
151152
/// Shared reference to accounts database
152-
pub adb: AccountsDb,
153+
pub accounts_db: AccountsDb,
153154

154155
/// Bank epoch
155156
epoch: Epoch,
@@ -300,14 +301,16 @@ impl TransactionProcessingCallback for Bank {
300301
account: &Pubkey,
301302
owners: &[Pubkey],
302303
) -> Option<usize> {
303-
self.adb.account_matches_owners(account, owners).ok()
304+
self.accounts_db
305+
.account_matches_owners(account, owners)
306+
.ok()
304307
}
305308

306309
fn get_account_shared_data(
307310
&self,
308311
pubkey: &Pubkey,
309312
) -> Option<AccountSharedData> {
310-
self.adb.get_account(pubkey).map(Into::into).ok()
313+
self.accounts_db.get_account(pubkey).map(Into::into).ok()
311314
}
312315

313316
// NOTE: must hold idempotent for the same set of arguments
@@ -499,7 +502,7 @@ impl Bank {
499502
feature_set.activate(&disable_rent_fees_collection::ID, 1);
500503

501504
let mut bank = Self {
502-
adb,
505+
accounts_db: adb,
503506
epoch: Epoch::default(),
504507
epoch_schedule: EpochSchedule::default(),
505508
is_delta: AtomicBool::default(),
@@ -693,11 +696,11 @@ impl Bank {
693696
// Slot, Epoch
694697
// -----------------
695698
pub fn slot(&self) -> Slot {
696-
self.adb.slot()
699+
self.accounts_db.slot()
697700
}
698701

699702
fn set_slot(&self, slot: Slot) {
700-
self.adb.set_slot(slot);
703+
self.accounts_db.set_slot(slot);
701704
}
702705

703706
pub fn advance_slot(&self) -> Slot {
@@ -829,16 +832,16 @@ impl Bank {
829832
// Accounts
830833
// -----------------
831834
pub fn has_account(&self, pubkey: &Pubkey) -> bool {
832-
self.adb.contains_account(pubkey)
835+
self.accounts_db.contains_account(pubkey)
833836
}
834837

835838
pub fn get_account(&self, pubkey: &Pubkey) -> Option<AccountSharedData> {
836-
self.adb.get_account(pubkey).map(Into::into).ok()
839+
self.accounts_db.get_account(pubkey).map(Into::into).ok()
837840
}
838841

839842
/// fn store the single `account` with `pubkey`.
840843
pub fn store_account(&self, pubkey: Pubkey, account: AccountSharedData) {
841-
self.adb.insert_account(&pubkey, &account);
844+
self.accounts_db.insert_account(&pubkey, &account);
842845
if let Some(notifier) = &self.accounts_update_notifier {
843846
let slot = self.slot();
844847
notifier.notify_account_update(slot, &account, &None, &pubkey, 0);
@@ -850,13 +853,13 @@ impl Bank {
850853
&self,
851854
_sorted: bool,
852855
) -> impl Iterator<Item = (Pubkey, AccountSharedData)> + '_ {
853-
self.adb.iter_all()
856+
self.accounts_db.iter_all()
854857
}
855858

856859
pub fn store_accounts(&self, accounts: Vec<(Pubkey, AccountSharedData)>) {
857860
let slot = self.slot();
858861
for (pubkey, acc) in accounts {
859-
self.adb.insert_account(&pubkey, &acc);
862+
self.accounts_db.insert_account(&pubkey, &acc);
860863
if let Some(notifier) = &self.accounts_update_notifier {
861864
notifier.notify_account_update(slot, &acc, &None, &pubkey, 0);
862865
}
@@ -977,7 +980,7 @@ impl Bank {
977980
where
978981
F: Fn(&AccountSharedData) -> bool + Send + Sync,
979982
{
980-
self.adb
983+
self.accounts_db
981984
.get_program_accounts(program_id, filter)
982985
.inspect_err(|err| {
983986
log::error!("failed to load program accounts: {err}")
@@ -2303,7 +2306,7 @@ impl Bank {
23032306
}
23042307

23052308
pub fn accounts_db_storage_size(&self) -> u64 {
2306-
self.adb.storage_size()
2309+
self.accounts_db.storage_size()
23072310
}
23082311

23092312
// -----------------
@@ -2445,7 +2448,7 @@ impl Bank {
24452448
}
24462449

24472450
pub fn flush(&self) {
2448-
self.adb.flush(true);
2451+
self.accounts_db.flush(true);
24492452
}
24502453
}
24512454

@@ -2527,3 +2530,10 @@ fn max_number_of_accounts_to_collect(
25272530
})
25282531
.sum()
25292532
}
2533+
2534+
impl FinalityProvider for Bank {
2535+
fn get_latest_final_slot(&self) -> Slot {
2536+
// Oldest snapshot or genesis slot
2537+
self.accounts_db.get_oldest_snapshot_slot().unwrap_or(0)
2538+
}
2539+
}

magicblock-config/src/ledger.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use serde::{Deserialize, Serialize};
22

33
use crate::helpers::serde_defaults::bool_true;
44

5+
// Default desired ledger size 100 GiB
6+
pub const DEFAULT_LEDGER_SIZE_BYTES: u64 = 100 * 1024 * 1024 * 1024;
7+
58
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
6-
#[serde(deny_unknown_fields)]
9+
#[serde(deny_unknown_fields, rename_all = "kebab-case")]
710
pub struct LedgerConfig {
811
/// If a previous ledger is found it is removed before starting the validator
912
/// This can be disabled by setting [Self::reset] to `false`.
@@ -13,13 +16,21 @@ pub struct LedgerConfig {
1316
// If left empty it will be auto-generated to a temporary folder
1417
#[serde(default)]
1518
pub path: Option<String>,
19+
// The size under which it's desired to keep ledger in bytes.
20+
#[serde(default = "default_ledger_size")]
21+
pub size: u64,
22+
}
23+
24+
const fn default_ledger_size() -> u64 {
25+
DEFAULT_LEDGER_SIZE_BYTES
1626
}
1727

1828
impl Default for LedgerConfig {
1929
fn default() -> Self {
2030
Self {
2131
reset: bool_true(),
2232
path: Default::default(),
33+
size: DEFAULT_LEDGER_SIZE_BYTES,
2334
}
2435
}
2536
}

magicblock-config/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@ impl EphemeralConfig {
233233
if let Ok(ledger_path) = env::var("LEDGER_PATH") {
234234
config.ledger.path = Some(ledger_path);
235235
}
236+
if let Ok(ledger_path) = env::var("LEDGER_SIZE") {
237+
config.ledger.size = ledger_path.parse().unwrap_or_else(|err| {
238+
panic!("Failed to parse 'LEDGER_SIZE' as u64: {:?}", err)
239+
});
240+
}
236241

237242
// -----------------
238243
// Metrics

0 commit comments

Comments
 (0)