Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
999953c
chore: subscribe sequentially and remove possible LRU cache race cond…
thlorenz Oct 30, 2025
a0774c2
fix: cleanup subscriptions and resubscribe on upstream connection close
thlorenz Oct 30, 2025
fb6e73e
feat: sub metrics via LRU cache
thlorenz Oct 31, 2025
dae6cc4
feat: clients also return subscription count
thlorenz Oct 31, 2025
7ecfc4a
chore: warn if LRU cache count is not matching pubsub count
thlorenz Oct 31, 2025
533ccde
chore: fix unsub on already evicted + metric counts
thlorenz Nov 2, 2025
638622d
chore: log discrepant account
thlorenz Nov 2, 2025
f6a0270
chore: don't remove pubkey from LRU cache if unsub fails
thlorenz Nov 2, 2025
402e0b7
chore: only removing sub when unsubscribe completed
thlorenz Nov 2, 2025
8d31a09
chore: improve subs logging
thlorenz Nov 2, 2025
754663f
chore: simplify unsub and remove invalid resub
thlorenz Nov 2, 2025
7943a7d
chore: eliminate sub/unsub race condition
thlorenz Nov 2, 2025
361b0da
chore: prevent overwriting existing sub
thlorenz Nov 2, 2025
33430a3
chore: tracing fetch + subs for cleaner debug logs
thlorenz Nov 2, 2025
98351e7
chore: minor comments
thlorenz Nov 3, 2025
56e05f5
chore: update correct metric + log on info for now
thlorenz Nov 3, 2025
399c37d
chore: add metrics query to test context
thlorenz Nov 3, 2025
d12b74f
chore: add manual ix test to diagnose subscriptions
thlorenz Nov 3, 2025
0d638f4
chore: merge bmuddha/fix/ws-reconnects, adjusting the changes
thlorenz Nov 3, 2025
5ec5287
chore: adding warn logs when recverr occurs
thlorenz Nov 4, 2025
e64166c
chore: fix max log level override
thlorenz Nov 4, 2025
07b170e
chore: more robust handling of fetch failure
thlorenz Nov 4, 2025
0c7a7cd
chore: fix recycle connections deadlock
thlorenz Nov 5, 2025
21f91d1
chore: clippy
thlorenz Nov 5, 2025
cf69692
fix: the extra task was overkill and not awaited
thlorenz Nov 5, 2025
c193be6
chore: log delegation issues on debug
thlorenz Nov 5, 2025
9dce1a9
fix: rely on cancellation tokens, remove join_set to fix endles recyc…
thlorenz Nov 5, 2025
9f0a972
chore: recycle with backoff
thlorenz Nov 5, 2025
7d635c4
chore: try to resub before recycle
thlorenz Nov 5, 2025
bf366ae
chore: pubsub client includes client id in all logs
thlorenz Nov 5, 2025
a4984d4
feat: better orchestrated reconnection logic
thlorenz Nov 6, 2025
fece0cd
chore: test reconnection logic
thlorenz Nov 6, 2025
cf9082a
Merge branch 'master' into thlorenz/subscription-metrics
thlorenz Nov 6, 2025
6283d23
chore: fmt ix tests
thlorenz Nov 6, 2025
889c46e
chore: clarifying comment
thlorenz Nov 6, 2025
22c1817
chore: fix reconnect ix test
thlorenz Nov 6, 2025
1058443
chore: logging precise info about which kind of accounts were in bank…
thlorenz Nov 7, 2025
31b49e5
chore: defensive refetch of accounts we should have been watching
thlorenz Nov 7, 2025
a81e9bb
chore: ensure we don't count removed accounts twice
thlorenz Nov 7, 2025
7fd5ec6
chore: more logs
thlorenz Nov 7, 2025
2bdd9ff
chore: debug fetched accounts
thlorenz Nov 7, 2025
908733f
chore: more info when removing account from bank
thlorenz Nov 7, 2025
37b9d32
chore: triaging those empty accounts
thlorenz Nov 7, 2025
b284277
chore: minor cleanup
thlorenz Nov 7, 2025
c9c7f8d
fix: rollback LRU entry when eviction unsubscribe fails
thlorenz Nov 7, 2025
fdbaed2
fix: error handling in subscription limits test to propagate task pan…
thlorenz Nov 7, 2025
405d4ff
fix: incorrect non-empty account count in chainlink logging
thlorenz Nov 7, 2025
3e7b400
chore: lint + fmt
thlorenz Nov 7, 2025
3bc011d
Merge branch 'master' into thlorenz/subscription-metrics
thlorenz Nov 7, 2025
61d475a
chore: less fetch account chatter on debug
thlorenz Nov 7, 2025
14ecd09
fix: don't persist empty accounts after txn execution
bmuddha Nov 7, 2025
8714d6a
chore: fix stale comments
thlorenz Nov 7, 2025
ab75627
fix: preserve concurrent fetch waiters in remote account provider
thlorenz Nov 7, 2025
4ef27a0
fix: log level issue
thlorenz Nov 7, 2025
7dd4665
fix: account overwrite issue
thlorenz Nov 7, 2025
ef698ee
chore: remove redundant fetch_sub call in magicblock-chainlink/src/ch…
thlorenz Nov 7, 2025
ca4f9c7
chore: stop spamming debug with not found accounts
thlorenz Nov 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jsonrpc-pubsub = "18.0.0"
jsonrpc-ws-server = "18.0.0"
lazy_static = "1.4.0"
libc = "0.2.153"
log = { version = "0.4.20", features = ["release_max_level_info"] }
log = { version = "0.4.20" }
lru = "0.16.0"
macrotest = "1"
magic-domain-program = { git = "https://github.com/magicblock-labs/magic-domain-program.git", rev = "ea04d46", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions magicblock-accounts-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::Path, sync::Arc};
use std::{collections::HashSet, path::Path, sync::Arc};

use error::AccountsDbError;
use index::{
Expand Down Expand Up @@ -356,7 +356,7 @@ impl AccountsBank for AccountsDb {
.iter_all()
.filter(|(pk, acc)| predicate(pk, acc))
.map(|(pk, _)| pk)
.collect::<Vec<_>>();
.collect::<HashSet<_>>();
let removed = to_remove.len();
for pk in to_remove {
self.remove_account(&pk);
Expand Down
2 changes: 1 addition & 1 deletion magicblock-aperture/src/requests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl HttpDispatcher {
.inspect_err(|e| {
// There is nothing we can do if fetching the account fails
// Log the error and return whatever is in the accounts db
warn!("Failed to ensure account {pubkey}: {e}");
debug!("Failed to ensure account {pubkey}: {e}");
});
self.accountsdb.get_account(pubkey)
}
Expand Down
2 changes: 1 addition & 1 deletion magicblock-aperture/src/requests/http/send_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::{debug, trace};
use log::*;
use magicblock_metrics::metrics::{
TRANSACTION_PROCESSING_TIME, TRANSACTION_SKIP_PREFLIGHT,
};
Expand Down
4 changes: 3 additions & 1 deletion magicblock-chainlink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version.workspace = true
edition.workspace = true

[dependencies]
arc-swap = "1.7"
async-trait = { workspace = true }
bincode = { workspace = true }
env_logger = { workspace = true }
Expand All @@ -12,7 +13,8 @@ log = { workspace = true }
lru = { workspace = true }
magicblock-core = { workspace = true }
magicblock-magic-program-api = { workspace = true }
magicblock-delegation-program = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-delegation-program = { workspace = true }
serde_json = { workspace = true }
solana-account = { workspace = true }
solana-account-decoder = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion magicblock-chainlink/src/chainlink/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum ChainlinkError {
#[error("Cloner error: {0}")]
ClonerError(#[from] crate::cloner::errors::ClonerError),

#[error("Delegation could not be decoded: {0} ({1:?})")]
#[error("Delegation record could not be decoded: {0} ({1:?})")]
InvalidDelegationRecord(Pubkey, ProgramError),

#[error("Failed to resolve one or more accounts {0} when getting delegation records")]
Expand Down
49 changes: 35 additions & 14 deletions magicblock-chainlink/src/chainlink/fetch_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ where
// For accounts we couldn't find we cannot do anything. We will let code depending
// on them to be in the bank fail on its own
if !not_found.is_empty() {
debug!(
trace!(
"Could not find accounts on chain: {:?}",
not_found
.iter()
Expand Down Expand Up @@ -960,24 +960,37 @@ where
.lock()
.expect("pending_requests lock poisoned");

for &pubkey in pubkeys {
// Check synchronously if account is in bank
if self.accounts_bank.get_account(&pubkey).is_some() {
// Account is already in bank, we can skip it as it will be handled
// by the existing fetch_and_clone_accounts logic when needed
continue;
for pubkey in pubkeys {
// Check synchronously if account is in bank and subscribed when it should be
if let Some(account_in_bank) =
self.accounts_bank.get_account(pubkey)
{
// NOTE: we defensively correct accounts that we should have been watching but
// were not for some reason. We fetch them again in that case.
// This actually would point to a bug in the subscription logic.
// TODO(thlorenz): remove this once we are certain (by perusing logs) that this
// does not happen anymore
if account_in_bank.owner().eq(&dlp::id())
|| account_in_bank.delegated()
|| self.blacklisted_accounts.contains(pubkey)
|| self.is_watching(pubkey)
{
continue;
} else if !self.is_watching(pubkey) {
debug!("Account {pubkey} should be watched but wasn't");
}
}

// Check if account fetch is already pending
if let Some(requests) = pending.get_mut(&pubkey) {
if let Some(requests) = pending.get_mut(pubkey) {
let (sender, receiver) = oneshot::channel();
requests.push(sender);
await_pending.push((pubkey, receiver));
await_pending.push((*pubkey, receiver));
continue;
}

// Account needs to be fetched - add to fetch list
fetch_new.push(pubkey);
fetch_new.push(*pubkey);
}

// Create pending entries for accounts we need to fetch
Expand Down Expand Up @@ -1024,9 +1037,14 @@ where

// Wait for any pending requests to complete
let mut joinset = JoinSet::new();
for (_, receiver) in await_pending {
for (pubkey, receiver) in await_pending {
joinset.spawn(async move {
if let Err(err) = receiver.await {
if let Err(err) = receiver
.await
.inspect_err(|err| {
warn!("FetchCloner::clone_accounts - RecvError occurred while awaiting account {}: {err:?}. This indicates the account fetch sender was dropped without sending a value.", pubkey);
})
{
// The sender was dropped, likely due to an error in the other request
error!(
"Failed to receive account from pending request: {err}"
Expand Down Expand Up @@ -1499,9 +1517,12 @@ mod tests {
rpc_client,
pubsub_client,
forward_tx,
&RemoteAccountProviderConfig::default_with_lifecycle_mode(
&RemoteAccountProviderConfig::try_new_with_metrics(
1000,
LifecycleMode::Ephemeral,
),
false,
)
.unwrap(),
)
.await
.unwrap(),
Expand Down
67 changes: 59 additions & 8 deletions magicblock-chainlink/src/chainlink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use dlp::pda::ephemeral_balance_pda_from_payer;
use errors::ChainlinkResult;
Expand Down Expand Up @@ -136,15 +139,55 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
pub fn reset_accounts_bank(&self) {
let blacklisted_accounts =
blacklisted_accounts(&self.validator_id, &self.faucet_id);

let delegated = AtomicU64::new(0);
let dlp_owned_not_delegated = AtomicU64::new(0);
let blacklisted = AtomicU64::new(0);
let remaining = AtomicU64::new(0);
let remaining_empty = AtomicU64::new(0);

let removed = self.accounts_bank.remove_where(|pubkey, account| {
(!account.delegated()
// This fixes the edge-case of accounts that were in the process of
// being undelegated but never completed while the validator was running
|| account.owner().eq(&dlp::id()))
&& !blacklisted_accounts.contains(pubkey)
if blacklisted_accounts.contains(pubkey) {
blacklisted.fetch_add(1, Ordering::Relaxed);
return false;
}
if account.delegated() {
delegated.fetch_add(1, Ordering::Relaxed);
return false;
}
if account.owner().eq(&dlp::id()) {
dlp_owned_not_delegated.fetch_add(1, Ordering::Relaxed);
return true;
}
trace!(
"Removing non-delegated, non-DLP-owned account: {pubkey} {:#?}",
account
);
remaining.fetch_add(1, Ordering::Relaxed);
if account.lamports() == 0
&& account.owner().ne(&solana_sdk::feature::id())
{
remaining_empty.fetch_add(1, Ordering::Relaxed);
}
true
});

debug!("Removed {removed} non-delegated accounts");
let non_empty = remaining
.load(Ordering::Relaxed)
.saturating_sub(remaining_empty.load(Ordering::Relaxed));

info!(
"Removed {removed} accounts from bank:
{} DLP-owned non-delegated
{} non-delegated non-blacklisted, no-feature non-empty.
{} non-delegated non-blacklisted empty
Kept: {} delegated, {} blacklisted",
dlp_owned_not_delegated.into_inner(),
non_empty,
remaining_empty.into_inner(),
delegated.into_inner(),
blacklisted.into_inner()
);
}

fn subscribe_account_removals(
Expand Down Expand Up @@ -283,7 +326,15 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", ");
trace!("Fetching accounts: {pubkeys_str}");
let mark_empty_str = mark_empty_if_not_found
.map(|keys| {
keys.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_default();
trace!("Fetching accounts: {pubkeys_str}, mark_empty_if_not_found: {mark_empty_str}");
}
Self::promote_accounts(
fetch_cloner,
Expand Down
Loading
Loading