Skip to content

Commit 738a8d7

Browse files
bmuddhagreptile-apps[bot]GabrielePicco
authored
Bmuddha/fix/limit num monitoring accounts (#356)
This PR introduces a quick fix to existing cloning pipeline, to remove undelegated account from validator (both from database and from websocket monitoring) once certain preconfigured threshold is reached. The eviction of those accounts happen using LRU strategy. Every transaction affects this cache by touching the account and renewing its "stay", thus only stale/unused accounts are removed. Delegated accounts are not affected by this change. <!-- greptile_comment --> ## Greptile Summary Implemented account monitoring limits using an LRU cache to prevent unbounded memory growth in the validator. - Added `monitored_accounts` LRU cache in `RemoteAccountClonerWorker` with configurable capacity (default 2048) - Implemented account eviction logic in `track_undelegated_account` that removes accounts from AccountsDB and websocket subscriptions - Added metrics tracking for monitored accounts count, subscriptions per shard, and evicted accounts - Modified account update system to support unsubscribing from account monitoring when accounts are evicted - Added `remove_account` capability to AccountsDB and providers to properly cleanup evicted accounts <!-- /greptile_comment --> --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Gabriele Picco <piccogabriele@gmail.com>
1 parent 1481dc1 commit 738a8d7

File tree

31 files changed

+482
-93
lines changed

31 files changed

+482
-93
lines changed

Cargo.lock

Lines changed: 26 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

magicblock-account-cloner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ solana-sdk = { workspace = true }
2222
tokio = { workspace = true }
2323
tokio-util = { workspace = true }
2424
thiserror = { workspace = true }
25+
lru = "0.14"
2526

2627
[dev-dependencies]

magicblock-account-cloner/src/remote_account_cloner_worker.rs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
cell::RefCell,
23
collections::{hash_map::Entry, HashMap, HashSet},
34
sync::{Arc, RwLock},
45
time::Duration,
@@ -14,9 +15,10 @@ use futures_util::{
1415
stream::{self, StreamExt, TryStreamExt},
1516
};
1617
use log::*;
18+
use lru::LruCache;
1719
use magicblock_account_dumper::AccountDumper;
1820
use magicblock_account_fetcher::AccountFetcher;
19-
use magicblock_account_updates::AccountUpdates;
21+
use magicblock_account_updates::{AccountUpdates, AccountUpdatesResult};
2022
use magicblock_accounts_api::InternalAccountProvider;
2123
use magicblock_metrics::metrics;
2224
use magicblock_mutator::idl::{get_pubkey_anchor_idl, get_pubkey_shank_idl};
@@ -110,6 +112,22 @@ pub struct RemoteAccountClonerWorker<IAP, AFE, AUP, ADU> {
110112
clone_listeners: Arc<RwLock<HashMap<Pubkey, AccountClonerListeners>>>,
111113
last_clone_output: CloneOutputMap,
112114
validator_identity: Pubkey,
115+
monitored_accounts: RefCell<LruCache<Pubkey, ()>>,
116+
}
117+
118+
// SAFETY:
119+
// we never keep references to monitored_accounts around,
120+
// especially across await points, so this type is Send
121+
unsafe impl<IAP, AFE, AUP, ADU> Send
122+
for RemoteAccountClonerWorker<IAP, AFE, AUP, ADU>
123+
{
124+
}
125+
// SAFETY:
126+
// we never produce references to RefCell in monitored_accounts
127+
// especially not across await points, so this type is Sync
128+
unsafe impl<IAP, AFE, AUP, ADU> Sync
129+
for RemoteAccountClonerWorker<IAP, AFE, AUP, ADU>
130+
{
113131
}
114132

115133
impl<IAP, AFE, AUP, ADU> RemoteAccountClonerWorker<IAP, AFE, AUP, ADU>
@@ -131,10 +149,14 @@ where
131149
validator_charges_fees: ValidatorCollectionMode,
132150
permissions: AccountClonerPermissions,
133151
validator_authority: Pubkey,
152+
max_monitored_accounts: usize,
134153
) -> Self {
135154
let (clone_request_sender, clone_request_receiver) =
136155
unbounded_channel();
137156
let fetch_retries = 50;
157+
let max_monitored_accounts = max_monitored_accounts
158+
.try_into()
159+
.expect("max number of monitored accounts cannot be 0");
138160
Self {
139161
internal_account_provider,
140162
account_fetcher,
@@ -151,6 +173,7 @@ where
151173
clone_listeners: Default::default(),
152174
last_clone_output: Default::default(),
153175
validator_identity: validator_authority,
176+
monitored_accounts: LruCache::new(max_monitored_accounts).into(),
154177
}
155178
}
156179

@@ -169,16 +192,16 @@ where
169192
}
170193

171194
pub async fn start_clone_request_processing(
172-
&mut self,
195+
mut self,
173196
cancellation_token: CancellationToken,
174197
) {
198+
let mut requests = vec![];
175199
loop {
176-
let mut requests = vec![];
177200
tokio::select! {
178201
_ = self.clone_request_receiver.recv_many(&mut requests, 100) => {
179202
join_all(
180203
requests
181-
.into_iter()
204+
.drain(..)
182205
.map(|request| self.process_clone_request(request))
183206
).await;
184207
}
@@ -323,6 +346,7 @@ where
323346
.account_updates
324347
.get_last_known_update_slot(pubkey)
325348
.unwrap_or(u64::MIN);
349+
self.monitored_accounts.borrow_mut().promote(pubkey);
326350
// Check for the happy/fast path, we may already have cloned this account before
327351
match self.get_last_clone_output_from_pubkey(pubkey) {
328352
// If we already cloned this account, check what the output of the clone was
@@ -405,6 +429,39 @@ where
405429
Ok(updated_clone_output)
406430
}
407431

432+
/// Put the account's key into cache of monitored accounts, which has a limited capacity.
433+
/// Once the cache capacity exceeds the preconfigured limit, it will trigger an eviction,
434+
/// followed by account's removal from AccountsDB and termination of its ws subscription
435+
async fn track_not_delegated_account(
436+
&self,
437+
pubkey: Pubkey,
438+
) -> AccountUpdatesResult<()> {
439+
let evicted = self
440+
.monitored_accounts
441+
.borrow_mut()
442+
.push(pubkey, ())
443+
.filter(|(pk, _)| *pk != pubkey);
444+
if let Some((evicted, _)) = evicted {
445+
self.last_clone_output
446+
.write()
447+
.expect("last accounts clone output map is poisoned")
448+
.remove(&evicted);
449+
self.internal_account_provider.remove_account(&evicted);
450+
self.clone_listeners
451+
.write()
452+
.expect("clone listeners map is poisoned")
453+
.remove(&evicted);
454+
self.account_updates
455+
.stop_account_monitoring(&evicted)
456+
.await?;
457+
metrics::inc_evicted_accounts_count();
458+
}
459+
metrics::adjust_monitored_accounts_count(
460+
self.monitored_accounts.borrow().len(),
461+
);
462+
Ok(())
463+
}
464+
408465
async fn do_clone(
409466
&self,
410467
pubkey: &Pubkey,
@@ -428,8 +485,7 @@ where
428485
// - we may not want to track lamport changes, especially for payers
429486
self.account_updates
430487
.ensure_account_monitoring(pubkey)
431-
.await
432-
.map_err(AccountClonerError::AccountUpdatesError)?;
488+
.await?;
433489

434490
// Fetch the account, repeat and retry until we have a satisfactory response
435491
let mut fetch_count = 0;
@@ -491,6 +547,8 @@ where
491547
});
492548
}
493549

550+
// Fee payer accounts are non-delegated ones, so we keep track of them as well
551+
self.track_not_delegated_account(*pubkey).await?;
494552
match self.validator_charges_fees {
495553
ValidatorCollectionMode::NoFees => self
496554
.do_clone_feepayer_account_for_non_charging_validator(
@@ -602,6 +660,9 @@ where
602660
at_slot: account_chain_snapshot.at_slot,
603661
});
604662
}
663+
// Keep track of non-delegated accounts, removing any stale ones,
664+
// which were evicted from monitored accounts cache
665+
self.track_not_delegated_account(*pubkey).await?;
605666
self.do_clone_undelegated_account(pubkey, account)?
606667
}
607668
}
@@ -612,6 +673,13 @@ where
612673
delegation_record,
613674
..
614675
} => {
676+
// Just in case if the account was promoted from not delegated to delegated state, we
677+
// remove it from list of monitored accounts, to avoid removal on eviction
678+
self.monitored_accounts.borrow_mut().pop(pubkey);
679+
metrics::adjust_monitored_accounts_count(
680+
self.monitored_accounts.borrow().len(),
681+
);
682+
615683
if !self.permissions.allow_cloning_delegated_accounts {
616684
return Ok(AccountClonerOutput::Unclonable {
617685
pubkey: *pubkey,

magicblock-account-cloner/tests/remote_account_cloner.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fn setup_custom(
3737
// Default configuration
3838
let payer_init_lamports = Some(1_000 * LAMPORTS_PER_SOL);
3939
// Create account cloner worker and client
40-
let mut cloner_worker = RemoteAccountClonerWorker::new(
40+
let cloner_worker = RemoteAccountClonerWorker::new(
4141
internal_account_provider,
4242
account_fetcher,
4343
account_updates,
@@ -48,17 +48,17 @@ fn setup_custom(
4848
ValidatorCollectionMode::NoFees,
4949
permissions,
5050
Pubkey::new_unique(),
51+
1024,
5152
);
5253
let cloner_client = RemoteAccountClonerClient::new(&cloner_worker);
5354
// Run the worker in a separate task
5455
let cancellation_token = CancellationToken::new();
5556
let cloner_worker_handle = {
5657
let cloner_cancellation_token = cancellation_token.clone();
57-
tokio::spawn(async move {
58+
tokio::spawn(
5859
cloner_worker
59-
.start_clone_request_processing(cloner_cancellation_token)
60-
.await
61-
})
60+
.start_clone_request_processing(cloner_cancellation_token),
61+
)
6262
};
6363
// Ready to run
6464
(cloner_client, cancellation_token, cloner_worker_handle)

magicblock-account-updates/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ license.workspace = true
88
edition.workspace = true
99

1010
[dependencies]
11+
magicblock-metrics = { workspace = true }
1112
conjunto-transwise = { workspace = true }
1213
futures-util = { workspace = true }
1314
log = { workspace = true }

magicblock-account-updates/src/account_updates.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use solana_sdk::{clock::Slot, pubkey::Pubkey};
22
use thiserror::Error;
3+
use tokio::sync::mpsc::error::SendError;
34

45
#[derive(Debug, Clone, Error)]
56
pub enum AccountUpdatesError {
67
#[error(transparent)]
7-
SendError(#[from] tokio::sync::mpsc::error::SendError<Pubkey>),
8+
SendError(#[from] SendError<(Pubkey, bool)>),
89
}
910

1011
pub type AccountUpdatesResult<T> = Result<T, AccountUpdatesError>;
@@ -15,6 +16,13 @@ pub trait AccountUpdates {
1516
&self,
1617
pubkey: &Pubkey,
1718
) -> AccountUpdatesResult<()>;
19+
#[allow(async_fn_in_trait)]
20+
async fn stop_account_monitoring(
21+
&self,
22+
_pubkey: &Pubkey,
23+
) -> AccountUpdatesResult<()> {
24+
Ok(())
25+
}
1826
fn get_first_subscribed_slot(&self, pubkey: &Pubkey) -> Option<Slot>;
1927
fn get_last_known_update_slot(&self, pubkey: &Pubkey) -> Option<Slot>;
2028
}

magicblock-account-updates/src/remote_account_updates_client.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio::sync::mpsc::Sender;
99
use crate::{AccountUpdates, AccountUpdatesError, RemoteAccountUpdatesWorker};
1010

1111
pub struct RemoteAccountUpdatesClient {
12-
monitoring_request_sender: Sender<Pubkey>,
12+
monitoring_request_sender: Sender<(Pubkey, bool)>,
1313
first_subscribed_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
1414
last_known_update_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
1515
}
@@ -30,17 +30,29 @@ impl AccountUpdates for RemoteAccountUpdatesClient {
3030
pubkey: &Pubkey,
3131
) -> Result<(), AccountUpdatesError> {
3232
self.monitoring_request_sender
33-
.send(*pubkey)
33+
.send((*pubkey, false))
3434
.await
35-
.map_err(AccountUpdatesError::SendError)
35+
.map_err(Into::into)
3636
}
37+
38+
async fn stop_account_monitoring(
39+
&self,
40+
pubkey: &Pubkey,
41+
) -> Result<(), AccountUpdatesError> {
42+
self.monitoring_request_sender
43+
.send((*pubkey, true))
44+
.await
45+
.map_err(Into::into)
46+
}
47+
3748
fn get_first_subscribed_slot(&self, pubkey: &Pubkey) -> Option<Slot> {
3849
self.first_subscribed_slots
3950
.read()
4051
.expect("RwLock of RemoteAccountUpdatesClient.first_subscribed_slots poisoned")
4152
.get(pubkey)
4253
.cloned()
4354
}
55+
4456
fn get_last_known_update_slot(&self, pubkey: &Pubkey) -> Option<Slot> {
4557
self.last_known_update_slots
4658
.read()

0 commit comments

Comments
 (0)