diff --git a/Cargo.lock b/Cargo.lock index 5ff4a459f..435526eb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,6 +3635,7 @@ dependencies = [ name = "magicblock-chainlink" version = "0.2.3" dependencies = [ + "arc-swap", "assert_matches", "async-trait", "bincode", @@ -3646,6 +3647,7 @@ dependencies = [ "magicblock-core", "magicblock-delegation-program", "magicblock-magic-program-api", + "magicblock-metrics", "serde_json", "solana-account", "solana-account-decoder", diff --git a/Cargo.toml b/Cargo.toml index b7c9686d8..216142012 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ jsonrpc-pubsub = "18.0.0" jsonrpc-ws-server = "18.0.0" lazy_static = "1.4.0" libc = "0.2.153" -log = { version = "0.4.20", features = ["release_max_level_info"] } +log = { version = "0.4.20" } lru = "0.16.0" macrotest = "1" magic-domain-program = { git = "https://github.com/magicblock-labs/magic-domain-program.git", rev = "ea04d46", default-features = false } diff --git a/magicblock-accounts-db/src/lib.rs b/magicblock-accounts-db/src/lib.rs index 8f714fe72..74b516c20 100644 --- a/magicblock-accounts-db/src/lib.rs +++ b/magicblock-accounts-db/src/lib.rs @@ -1,4 +1,4 @@ -use std::{path::Path, sync::Arc}; +use std::{collections::HashSet, path::Path, sync::Arc}; use error::AccountsDbError; use index::{ @@ -356,7 +356,7 @@ impl AccountsBank for AccountsDb { .iter_all() .filter(|(pk, acc)| predicate(pk, acc)) .map(|(pk, _)| pk) - .collect::>(); + .collect::>(); let removed = to_remove.len(); for pk in to_remove { self.remove_account(&pk); diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index 4c1897edd..ea599e999 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -112,7 +112,7 @@ impl HttpDispatcher { .inspect_err(|e| { // There is nothing we can do if fetching the account fails // Log the error and return whatever is in the accounts db - warn!("Failed to ensure account {pubkey}: {e}"); + debug!("Failed to ensure account {pubkey}: {e}"); }); self.accountsdb.get_account(pubkey) } diff --git a/magicblock-aperture/src/requests/http/send_transaction.rs b/magicblock-aperture/src/requests/http/send_transaction.rs index 67f1c446c..9bf1f7012 100644 --- a/magicblock-aperture/src/requests/http/send_transaction.rs +++ b/magicblock-aperture/src/requests/http/send_transaction.rs @@ -1,4 +1,4 @@ -use log::{debug, trace}; +use log::*; use magicblock_metrics::metrics::{ TRANSACTION_PROCESSING_TIME, TRANSACTION_SKIP_PREFLIGHT, }; diff --git a/magicblock-chainlink/Cargo.toml b/magicblock-chainlink/Cargo.toml index 71e7eb0f4..ea342b02f 100644 --- a/magicblock-chainlink/Cargo.toml +++ b/magicblock-chainlink/Cargo.toml @@ -4,6 +4,7 @@ version.workspace = true edition.workspace = true [dependencies] +arc-swap = "1.7" async-trait = { workspace = true } bincode = { workspace = true } env_logger = { workspace = true } @@ -12,7 +13,8 @@ log = { workspace = true } lru = { workspace = true } magicblock-core = { workspace = true } magicblock-magic-program-api = { workspace = true } -magicblock-delegation-program = { workspace = true } +magicblock-metrics = { workspace = true } + magicblock-delegation-program = { workspace = true } serde_json = { workspace = true } solana-account = { workspace = true } solana-account-decoder = { workspace = true } diff --git a/magicblock-chainlink/src/chainlink/errors.rs b/magicblock-chainlink/src/chainlink/errors.rs index 5e0d44771..09e9c4cce 100644 --- a/magicblock-chainlink/src/chainlink/errors.rs +++ b/magicblock-chainlink/src/chainlink/errors.rs @@ -18,7 +18,7 @@ pub enum ChainlinkError { #[error("Cloner error: {0}")] ClonerError(#[from] crate::cloner::errors::ClonerError), - #[error("Delegation could not be decoded: {0} ({1:?})")] + #[error("Delegation record could not be decoded: {0} ({1:?})")] InvalidDelegationRecord(Pubkey, ProgramError), #[error("Failed to resolve one or more accounts {0} when getting delegation records")] diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner.rs b/magicblock-chainlink/src/chainlink/fetch_cloner.rs index 6216ea5fd..a8b205fcf 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner.rs @@ -581,7 +581,7 @@ where // For accounts we couldn't find we cannot do anything. We will let code depending // on them to be in the bank fail on its own if !not_found.is_empty() { - debug!( + trace!( "Could not find accounts on chain: {:?}", not_found .iter() @@ -960,24 +960,37 @@ where .lock() .expect("pending_requests lock poisoned"); - for &pubkey in pubkeys { - // Check synchronously if account is in bank - if self.accounts_bank.get_account(&pubkey).is_some() { - // Account is already in bank, we can skip it as it will be handled - // by the existing fetch_and_clone_accounts logic when needed - continue; + for pubkey in pubkeys { + // Check synchronously if account is in bank and subscribed when it should be + if let Some(account_in_bank) = + self.accounts_bank.get_account(pubkey) + { + // NOTE: we defensively correct accounts that we should have been watching but + // were not for some reason. We fetch them again in that case. + // This actually would point to a bug in the subscription logic. + // TODO(thlorenz): remove this once we are certain (by perusing logs) that this + // does not happen anymore + if account_in_bank.owner().eq(&dlp::id()) + || account_in_bank.delegated() + || self.blacklisted_accounts.contains(pubkey) + || self.is_watching(pubkey) + { + continue; + } else if !self.is_watching(pubkey) { + debug!("Account {pubkey} should be watched but wasn't"); + } } // Check if account fetch is already pending - if let Some(requests) = pending.get_mut(&pubkey) { + if let Some(requests) = pending.get_mut(pubkey) { let (sender, receiver) = oneshot::channel(); requests.push(sender); - await_pending.push((pubkey, receiver)); + await_pending.push((*pubkey, receiver)); continue; } // Account needs to be fetched - add to fetch list - fetch_new.push(pubkey); + fetch_new.push(*pubkey); } // Create pending entries for accounts we need to fetch @@ -1024,9 +1037,14 @@ where // Wait for any pending requests to complete let mut joinset = JoinSet::new(); - for (_, receiver) in await_pending { + for (pubkey, receiver) in await_pending { joinset.spawn(async move { - if let Err(err) = receiver.await { + if let Err(err) = receiver + .await + .inspect_err(|err| { + warn!("FetchCloner::clone_accounts - RecvError occurred while awaiting account {}: {err:?}. This indicates the account fetch sender was dropped without sending a value.", pubkey); + }) + { // The sender was dropped, likely due to an error in the other request error!( "Failed to receive account from pending request: {err}" @@ -1499,9 +1517,12 @@ mod tests { rpc_client, pubsub_client, forward_tx, - &RemoteAccountProviderConfig::default_with_lifecycle_mode( + &RemoteAccountProviderConfig::try_new_with_metrics( + 1000, LifecycleMode::Ephemeral, - ), + false, + ) + .unwrap(), ) .await .unwrap(), diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 7d0fd7795..cef5d55de 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use dlp::pda::ephemeral_balance_pda_from_payer; use errors::ChainlinkResult; @@ -136,15 +139,55 @@ impl pub fn reset_accounts_bank(&self) { let blacklisted_accounts = blacklisted_accounts(&self.validator_id, &self.faucet_id); + + let delegated = AtomicU64::new(0); + let dlp_owned_not_delegated = AtomicU64::new(0); + let blacklisted = AtomicU64::new(0); + let remaining = AtomicU64::new(0); + let remaining_empty = AtomicU64::new(0); + let removed = self.accounts_bank.remove_where(|pubkey, account| { - (!account.delegated() - // This fixes the edge-case of accounts that were in the process of - // being undelegated but never completed while the validator was running - || account.owner().eq(&dlp::id())) - && !blacklisted_accounts.contains(pubkey) + if blacklisted_accounts.contains(pubkey) { + blacklisted.fetch_add(1, Ordering::Relaxed); + return false; + } + if account.delegated() { + delegated.fetch_add(1, Ordering::Relaxed); + return false; + } + if account.owner().eq(&dlp::id()) { + dlp_owned_not_delegated.fetch_add(1, Ordering::Relaxed); + return true; + } + trace!( + "Removing non-delegated, non-DLP-owned account: {pubkey} {:#?}", + account + ); + remaining.fetch_add(1, Ordering::Relaxed); + if account.lamports() == 0 + && account.owner().ne(&solana_sdk::feature::id()) + { + remaining_empty.fetch_add(1, Ordering::Relaxed); + } + true }); - debug!("Removed {removed} non-delegated accounts"); + let non_empty = remaining + .load(Ordering::Relaxed) + .saturating_sub(remaining_empty.load(Ordering::Relaxed)); + + info!( + "Removed {removed} accounts from bank: +{} DLP-owned non-delegated +{} non-delegated non-blacklisted, no-feature non-empty. +{} non-delegated non-blacklisted empty +Kept: {} delegated, {} blacklisted", + dlp_owned_not_delegated.into_inner(), + non_empty, + remaining_empty.into_inner(), + delegated.into_inner(), + blacklisted.into_inner() + ); } fn subscribe_account_removals( @@ -283,7 +326,15 @@ impl .map(|p| p.to_string()) .collect::>() .join(", "); - trace!("Fetching accounts: {pubkeys_str}"); + let mark_empty_str = mark_empty_if_not_found + .map(|keys| { + keys.iter() + .map(|p| p.to_string()) + .collect::>() + .join(", ") + }) + .unwrap_or_default(); + trace!("Fetching accounts: {pubkeys_str}, mark_empty_if_not_found: {mark_empty_str}"); } Self::promote_accounts( fetch_cloner, diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 030bf93bb..a8259f71e 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -1,22 +1,30 @@ use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, fmt, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicBool, AtomicU16, Ordering}, + Arc, Mutex, + }, }; use log::*; use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding}; use solana_pubkey::Pubkey; -use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; use solana_rpc_client_api::{ config::RpcAccountInfoConfig, response::Response as RpcResponse, }; use solana_sdk::{commitment_config::CommitmentConfig, sysvar::clock}; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + time::Duration, +}; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use super::errors::{RemoteAccountProviderError, RemoteAccountProviderResult}; +use super::{ + chain_pubsub_client::PubSubConnection, + errors::{RemoteAccountProviderError, RemoteAccountProviderResult}, +}; // Log every 10 secs (given chain slot time is 400ms) const CLOCK_LOG_SLOT_FREQ: u64 = 25; @@ -65,21 +73,25 @@ struct AccountSubscription { pub struct ChainPubsubActor { /// Configuration used to create the pubsub client pubsub_client_config: PubsubClientConfig, - /// Underlying pubsub client to connect to the chain - pubsub_client: Arc, + /// Underlying pubsub connection to connect to the chain + pubsub_connection: Arc, /// Sends subscribe/unsubscribe messages to this actor messages_sender: mpsc::Sender, /// Map of subscriptions we are holding subscriptions: Arc>>, /// Sends updates for any account subscription that is received via - /// the [Self::pubsub_client] + /// the [Self::pubsub_connection] subscription_updates_sender: mpsc::Sender, - /// The tasks that watch subscriptions via the [Self::pubsub_client] and - /// channel them into the [Self::subscription_updates_sender] - subscription_watchers: Arc>>, /// The token to use to cancel all subscriptions and shut down the /// message listener, essentially shutting down whis actor shutdown_token: CancellationToken, + /// Unique client ID for this actor instance used in logs + client_id: u16, + /// Indicates whether the actor is connected or has been disconnected due RPC to connection + /// issues + is_connected: Arc, + /// Channel used to signal connection issues to the submux + abort_sender: mpsc::Sender<()>, } #[derive(Debug)] @@ -92,7 +104,7 @@ pub enum ChainPubsubActorMessage { pubkey: Pubkey, response: oneshot::Sender>, }, - RecycleConnections { + Reconnect { response: oneshot::Sender>, }, } @@ -103,36 +115,40 @@ const MESSAGE_CHANNEL_SIZE: usize = 1_000; impl ChainPubsubActor { pub async fn new_from_url( pubsub_url: &str, + abort_sender: mpsc::Sender<()>, commitment: CommitmentConfig, ) -> RemoteAccountProviderResult<(Self, mpsc::Receiver)> { let config = PubsubClientConfig::from_url(pubsub_url, commitment); - Self::new(config).await + Self::new(abort_sender, config).await } pub async fn new( + abort_sender: mpsc::Sender<()>, pubsub_client_config: PubsubClientConfig, ) -> RemoteAccountProviderResult<(Self, mpsc::Receiver)> { - let pubsub_client = Arc::new( - PubsubClient::new(pubsub_client_config.pubsub_url.as_str()).await?, - ); + static CLIENT_ID: AtomicU16 = AtomicU16::new(0); + + let url = pubsub_client_config.pubsub_url.clone(); + let pubsub_connection = Arc::new(PubSubConnection::new(url).await?); let (subscription_updates_sender, subscription_updates_receiver) = mpsc::channel(SUBSCRIPTION_UPDATE_CHANNEL_SIZE); let (messages_sender, messages_receiver) = mpsc::channel(MESSAGE_CHANNEL_SIZE); - let subscription_watchers = - Arc::new(Mutex::new(tokio::task::JoinSet::new())); + let shutdown_token = CancellationToken::new(); let me = Self { pubsub_client_config, - pubsub_client, + pubsub_connection, messages_sender, subscriptions: Default::default(), subscription_updates_sender, - subscription_watchers, shutdown_token, + client_id: CLIENT_ID.fetch_add(1, Ordering::SeqCst), + is_connected: Arc::new(AtomicBool::new(true)), + abort_sender, }; me.start_worker(messages_receiver); @@ -142,7 +158,10 @@ impl ChainPubsubActor { } pub async fn shutdown(&self) { - info!("Shutting down ChainPubsubActor"); + info!( + "[client_id={}] Shutting down ChainPubsubActor", + self.client_id + ); let subs = self .subscriptions .lock() @@ -153,9 +172,34 @@ impl ChainPubsubActor { sub.cancellation_token.cancel(); } self.shutdown_token.cancel(); - // TODO: - // let mut subs = self.subscription_watchers.lock().unwrap();; - // subs.join_all().await; + } + + pub fn subscription_count(&self, filter: &[Pubkey]) -> usize { + if !self.is_connected.load(Ordering::SeqCst) { + return 0; + } + let subs = self + .subscriptions + .lock() + .expect("subscriptions lock poisoned"); + if filter.is_empty() { + subs.len() + } else { + subs.keys() + .filter(|pubkey| !filter.contains(pubkey)) + .count() + } + } + + pub fn subscriptions(&self) -> Vec { + if !self.is_connected.load(Ordering::SeqCst) { + return vec![]; + } + let subs = self + .subscriptions + .lock() + .expect("subscriptions lock poisoned"); + subs.keys().copied().collect() } pub async fn send_msg( @@ -175,23 +219,27 @@ impl ChainPubsubActor { mut messages_receiver: mpsc::Receiver, ) { let subs = self.subscriptions.clone(); - let subscription_watchers = self.subscription_watchers.clone(); let shutdown_token = self.shutdown_token.clone(); let pubsub_client_config = self.pubsub_client_config.clone(); let subscription_updates_sender = self.subscription_updates_sender.clone(); - let mut pubsub_client = self.pubsub_client.clone(); + let pubsub_connection = self.pubsub_connection.clone(); + let client_id = self.client_id; + let is_connected = self.is_connected.clone(); + let abort_sender = self.abort_sender.clone(); tokio::spawn(async move { loop { tokio::select! { msg = messages_receiver.recv() => { if let Some(msg) = msg { - pubsub_client = Self::handle_msg( + Self::handle_msg( subs.clone(), - pubsub_client.clone(), - subscription_watchers.clone(), + pubsub_connection.clone(), subscription_updates_sender.clone(), pubsub_client_config.clone(), + abort_sender.clone(), + client_id, + is_connected.clone(), msg ).await; } else { @@ -206,105 +254,152 @@ impl ChainPubsubActor { }); } + #[allow(clippy::too_many_arguments)] async fn handle_msg( subscriptions: Arc>>, - pubsub_client: Arc, - subscription_watchers: Arc>>, + pubsub_connection: Arc, subscription_updates_sender: mpsc::Sender, pubsub_client_config: PubsubClientConfig, + abort_sender: mpsc::Sender<()>, + client_id: u16, + is_connected: Arc, msg: ChainPubsubActorMessage, - ) -> Arc { + ) { + fn send_ok( + response: oneshot::Sender>, + client_id: u16, + ) { + let _ = response.send(Ok(())).inspect_err(|err| { + warn!( + "[client_id={client_id}] Failed to send msg ack: {err:?}" + ); + }); + } + match msg { ChainPubsubActorMessage::AccountSubscribe { pubkey, response } => { + if !is_connected.load(Ordering::SeqCst) { + trace!("[client_id={client_id}] Ignoring subscribe request for {pubkey} because disconnected"); + send_ok(response, client_id); + return; + } let commitment_config = pubsub_client_config.commitment_config; Self::add_sub( pubkey, response, subscriptions, - pubsub_client.clone(), - subscription_watchers, + pubsub_connection, subscription_updates_sender, + abort_sender, + is_connected, commitment_config, + client_id, ); - pubsub_client } ChainPubsubActorMessage::AccountUnsubscribe { pubkey, response, } => { + if !is_connected.load(Ordering::SeqCst) { + trace!("[client_id={client_id}] Ignoring unsubscribe request for {pubkey} because disconnected"); + send_ok(response, client_id); + return; + } if let Some(AccountSubscription { cancellation_token }) = - subscriptions.lock().unwrap().remove(&pubkey) + subscriptions + .lock() + .expect("subcriptions lock poisoned") + .get(&pubkey) { cancellation_token.cancel(); let _ = response.send(Ok(())); } else { - let _ = response + let _ = response .send(Err(RemoteAccountProviderError::AccountSubscriptionDoesNotExist( pubkey.to_string(), ))); } - pubsub_client } - ChainPubsubActorMessage::RecycleConnections { response } => { - match Self::recycle_connections( - subscriptions, - subscription_watchers, - subscription_updates_sender, + ChainPubsubActorMessage::Reconnect { response } => { + let result = Self::try_reconnect( + pubsub_connection, pubsub_client_config, + client_id, + is_connected, ) - .await - { - Ok(new_client) => { - let _ = response.send(Ok(())); - new_client - } - Err(err) => { - let _ = response.send(Err(err)); - pubsub_client - } - } + .await; + let _ = response.send(result); } } } + #[allow(clippy::too_many_arguments)] fn add_sub( pubkey: Pubkey, sub_response: oneshot::Sender>, subs: Arc>>, - pubsub_client: Arc, - subscription_watchers: Arc>>, + pubsub_connection: Arc, subscription_updates_sender: mpsc::Sender, + abort_sender: mpsc::Sender<()>, + is_connected: Arc, commitment_config: CommitmentConfig, + client_id: u16, ) { - trace!("Adding subscription for {pubkey} with commitment {commitment_config:?}"); + if subs + .lock() + .expect("subscriptions lock poisoned") + .contains_key(&pubkey) + { + trace!("[client_id={client_id}] Subscription for {pubkey} already exists, ignoring add_sub request"); + let _ = sub_response.send(Ok(())); + return; + } - let config = RpcAccountInfoConfig { - commitment: Some(commitment_config), - encoding: Some(UiAccountEncoding::Base64Zstd), - ..Default::default() - }; + trace!("[client_id={client_id}] Adding subscription for {pubkey} with commitment {commitment_config:?}"); let cancellation_token = CancellationToken::new(); - let mut sub_joinset = subscription_watchers.lock().unwrap(); - sub_joinset.spawn(async move { - // Attempt to subscribe to the account - let (mut update_stream, unsubscribe) = match pubsub_client - .account_subscribe(&pubkey, Some(config)) - .await { + // Insert into subscriptions HashMap immediately to prevent race condition + // with unsubscribe operations + // Assuming that messages to this actor are processed in the order they are sent + // then this eliminates the possibility of an unsubscribe being processed before + // the sub's cancellation token was added to the map + { + let mut subs_lock = + subs.lock().expect("subscriptions lock poisoned"); + subs_lock.insert( + pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + }, + ); + } + + tokio::spawn(async move { + let config = RpcAccountInfoConfig { + commitment: Some(commitment_config), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() + }; + let (mut update_stream, unsubscribe) = match pubsub_connection + .account_subscribe(&pubkey, config.clone()) + .await + { Ok(res) => res, Err(err) => { - let _ = sub_response.send(Err(err.into())); + error!("[client_id={client_id}] Failed to subscribe to account {pubkey} {err:?}"); + Self::abort_and_signal_connection_issue( + client_id, + subs.clone(), + abort_sender, + is_connected.clone(), + ); + return; } }; - // Then track the subscription and confirm to the requester that the - // subscription was made - subs.lock().unwrap().insert(pubkey, AccountSubscription { - cancellation_token: cancellation_token.clone(), - }); - + // RPC succeeded - confirm to the requester that the subscription was made let _ = sub_response.send(Ok(())); // Now keep listening for updates and relay them to the @@ -312,106 +407,129 @@ impl ChainPubsubActor { loop { tokio::select! { _ = cancellation_token.cancelled() => { - debug!("Subscription for {pubkey} was cancelled"); - unsubscribe().await; + trace!("[client_id={client_id}] Subscription for {pubkey} was cancelled"); break; } update = update_stream.next() => { if let Some(rpc_response) = update { if log_enabled!(log::Level::Trace) && (!pubkey.eq(&clock::ID) || rpc_response.context.slot % CLOCK_LOG_SLOT_FREQ == 0) { - trace!("Received update for {pubkey}: {rpc_response:?}"); + trace!("[client_id={client_id}] Received update for {pubkey}: {rpc_response:?}"); } let _ = subscription_updates_sender.send(SubscriptionUpdate { pubkey, rpc_response, }).await.inspect_err(|err| { - error!("Failed to send {pubkey} subscription update: {err:?}"); + error!("[client_id={client_id}] Failed to send {pubkey} subscription update: {err:?}"); }); } else { - debug!("Subscription for {pubkey} ended by update stream"); - break; + debug!("[client_id={client_id}] Subscription for {pubkey} ended (EOF); signaling connection issue"); + Self::abort_and_signal_connection_issue( + client_id, + subs.clone(), + abort_sender.clone(), + is_connected.clone(), + ); + // Return early - abort_and_signal_connection_issue cancels all + // subscriptions, triggering cleanup via the cancellation path + // above. No need to run unsubscribe/cleanup here. + return; } } } } + + // Clean up subscription with timeout to prevent hanging on dead sockets + if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) + .await + .is_err() + { + warn!( + "[client_id={client_id}] unsubscribe timed out for {pubkey}" + ); + } + subs.lock() + .expect("subscriptions lock poisoned") + .remove(&pubkey); }); } - async fn recycle_connections( - subscriptions: Arc>>, - subscription_watchers: Arc>>, - subscription_updates_sender: mpsc::Sender, + async fn try_reconnect( + pubsub_connection: Arc, pubsub_client_config: PubsubClientConfig, - ) -> RemoteAccountProviderResult> { - debug!("RecycleConnections: starting recycle process"); - - // 1. Recreate the pubsub client, in case that fails leave the old one in place - // as this is the best we can do - debug!( - "RecycleConnections: creating new PubsubClient for {}", - pubsub_client_config.pubsub_url - ); - let new_client = match PubsubClient::new( - pubsub_client_config.pubsub_url.as_str(), - ) - .await - { - Ok(c) => Arc::new(c), - Err(err) => { - error!("RecycleConnections: failed to create new PubsubClient: {err:?}"); - return Err(err.into()); - } + client_id: u16, + is_connected: Arc, + ) -> RemoteAccountProviderResult<()> { + // 1. Try to reconnect the pubsub connection + if let Err(err) = pubsub_connection.reconnect().await { + debug!("[client_id={}] failed to reconnect: {err:?}", client_id); + return Err(err.into()); + } + // Make a sub to any account and unsub immediately to verify connection + let pubkey = Pubkey::new_unique(); + let config = RpcAccountInfoConfig { + commitment: Some(pubsub_client_config.commitment_config), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() }; - // Cancel all current subscriptions and collect pubkeys to re-subscribe later + // 2. Try to subscribe to an account to verify connection + let (_, unsubscribe) = + match pubsub_connection.account_subscribe(&pubkey, config).await { + Ok(res) => res, + Err(err) => { + error!( + "[client_id={}] to verify connection via subscribe {err:?}", + client_id + ); + return Err(err.into()); + } + }; + + // 3. Unsubscribe immediately + unsubscribe().await; + + // 4. We are now connected again + is_connected.store(true, Ordering::SeqCst); + Ok(()) + } + + fn abort_and_signal_connection_issue( + client_id: u16, + subscriptions: Arc>>, + abort_sender: mpsc::Sender<()>, + is_connected: Arc, + ) { + // Only abort if we were connected; prevents duplicate aborts + if !is_connected.swap(false, Ordering::SeqCst) { + trace!( + "[client_id={client_id}] already disconnected, skipping abort" + ); + return; + } + + debug!("[client_id={client_id}] aborting"); + let drained = { let mut subs_lock = subscriptions.lock().unwrap(); std::mem::take(&mut *subs_lock) }; - let mut to_resubscribe = HashSet::new(); - for (pk, AccountSubscription { cancellation_token }) in drained { - to_resubscribe.insert(pk); + let drained_len = drained.len(); + for (_, AccountSubscription { cancellation_token }) in drained { cancellation_token.cancel(); } debug!( - "RecycleConnections: cancelled {} subscriptions", - to_resubscribe.len() + "[client_id={client_id}] canceled {} subscriptions", + drained_len ); - - // Abort and await all watcher tasks and add fresh joinset - debug!("RecycleConnections: aborting watcher tasks"); - let mut old_joinset = { - let mut watchers = subscription_watchers - .lock() - .expect("subscription_watchers lock poisonde"); - std::mem::replace(&mut *watchers, tokio::task::JoinSet::new()) - }; - old_joinset.abort_all(); - while let Some(_res) = old_joinset.join_next().await {} - debug!("RecycleConnections: watcher tasks terminated"); - - // Re-subscribe to all accounts - debug!( - "RecycleConnections: re-subscribing to {} accounts", - to_resubscribe.len() - ); - let commitment_config = pubsub_client_config.commitment_config; - for pk in to_resubscribe { - let (tx, _rx) = oneshot::channel(); - Self::add_sub( - pk, - tx, - subscriptions.clone(), - new_client.clone(), - subscription_watchers.clone(), - subscription_updates_sender.clone(), - commitment_config, - ); - } - - debug!("RecycleConnections: completed"); - - Ok(new_client) + // Use try_send to avoid blocking and naturally coalesce signals + let _ = abort_sender.try_send(()).inspect_err(|err| { + // Channel full is expected when reconnect is already in progress + if !matches!(err, mpsc::error::TrySendError::Full(_)) { + error!( + "[client_id={client_id}] failed to signal connection issue: {err:?}", + ) + } + }); } } diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs index 7624ef752..719d12345 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs @@ -1,10 +1,24 @@ -use std::sync::{Arc, Mutex}; +use std::{ + mem, + sync::{Arc, Mutex}, + time::Duration, +}; +use arc_swap::ArcSwap; use async_trait::async_trait; +use futures_util::{future::BoxFuture, stream::BoxStream}; use log::*; +use solana_account_decoder::UiAccount; use solana_pubkey::Pubkey; +use solana_pubsub_client::nonblocking::pubsub_client::{ + PubsubClient, PubsubClientResult, +}; +use solana_rpc_client_api::{config::RpcAccountInfoConfig, response::Response}; use solana_sdk::commitment_config::CommitmentConfig; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot, Mutex as AsyncMutex}, + time, +}; use super::{ chain_pubsub_actor::{ @@ -13,6 +27,90 @@ use super::{ errors::RemoteAccountProviderResult, }; +type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; +type SubscribeResult = PubsubClientResult<( + BoxStream<'static, Response>, + UnsubscribeFn, +)>; + +const MAX_RECONNECT_ATTEMPTS: usize = 5; +const RECONNECT_ATTEMPT_DELAY: Duration = Duration::from_millis(500); + +pub struct PubSubConnection { + client: ArcSwap, + url: String, + reconnect_guard: AsyncMutex<()>, +} + +impl PubSubConnection { + pub async fn new(url: String) -> RemoteAccountProviderResult { + let client = Arc::new(PubsubClient::new(&url).await?).into(); + let reconnect_guard = AsyncMutex::new(()); + Ok(Self { + client, + url, + reconnect_guard, + }) + } + + pub fn url(&self) -> &str { + &self.url + } + + pub async fn account_subscribe( + &self, + pubkey: &Pubkey, + config: RpcAccountInfoConfig, + ) -> SubscribeResult { + let client = self.client.load(); + let config = Some(config.clone()); + let (stream, unsub) = client.account_subscribe(pubkey, config).await?; + // SAFETY: + // the returned stream depends on the used client, which is only ever dropped + // if the connection has been terminated, at which point the stream is useless + // and will be discarded as well, thus it's safe lifetime extension to 'static + let stream = unsafe { + mem::transmute::< + BoxStream<'_, Response>, + BoxStream<'static, Response>, + >(stream) + }; + Ok((stream, unsub)) + } + + pub async fn reconnect(&self) -> PubsubClientResult<()> { + // Prevents multiple reconnect attempts running concurrently + let _guard = match self.reconnect_guard.try_lock() { + Ok(g) => g, + // Reconnect is already in progress + Err(_) => { + // Wait a bit and return to retry subscription + time::sleep(RECONNECT_ATTEMPT_DELAY).await; + return Ok(()); + } + }; + let mut attempt = 1; + let client = loop { + match PubsubClient::new(&self.url).await { + Ok(c) => break Arc::new(c), + Err(error) => { + warn!( + "failed to reconnect to ws endpoint at {} {error}", + self.url + ); + if attempt == MAX_RECONNECT_ATTEMPTS { + return Err(error); + } + attempt += 1; + time::sleep(RECONNECT_ATTEMPT_DELAY).await; + } + } + }; + self.client.store(client); + Ok(()) + } +} + // ----------------- // Trait // ----------------- @@ -27,9 +125,31 @@ pub trait ChainPubsubClient: Send + Sync + Clone + 'static { pubkey: Pubkey, ) -> RemoteAccountProviderResult<()>; async fn shutdown(&self); - async fn recycle_connections(&self); fn take_updates(&self) -> mpsc::Receiver; + + /// Provides the total number of subscriptions and the number of + /// subscriptions when excludig pubkeys in `exclude`. + /// - `exclude`: Optional slice of pubkeys to exclude from the count. + /// Returns a tuple of (total subscriptions, filtered subscriptions). + async fn subscription_count( + &self, + exclude: Option<&[Pubkey]>, + ) -> (usize, usize); + + fn subscriptions(&self) -> Vec; +} + +#[async_trait] +pub trait ReconnectableClient { + /// Attempts to reconnect to the pubsub server and should be invoked when the client sent the + /// abort signal. + async fn try_reconnect(&self) -> RemoteAccountProviderResult<()>; + /// Re-subscribes to multiple accounts after a reconnection. + async fn resub_multiple( + &self, + pubkeys: &[Pubkey], + ) -> RemoteAccountProviderResult<()>; } // ----------------- @@ -44,10 +164,15 @@ pub struct ChainPubsubClientImpl { impl ChainPubsubClientImpl { pub async fn try_new_from_url( pubsub_url: &str, + abort_sender: mpsc::Sender<()>, commitment: CommitmentConfig, ) -> RemoteAccountProviderResult { - let (actor, updates) = - ChainPubsubActor::new_from_url(pubsub_url, commitment).await?; + let (actor, updates) = ChainPubsubActor::new_from_url( + pubsub_url, + abort_sender, + commitment, + ) + .await?; Ok(Self { actor: Arc::new(actor), updates_rcvr: Arc::new(Mutex::new(Some(updates))), @@ -61,38 +186,6 @@ impl ChainPubsubClient for ChainPubsubClientImpl { self.actor.shutdown().await; } - async fn recycle_connections(&self) { - // Fire a recycle request to the actor and await the acknowledgement. - // If recycle fails there is nothing the caller could do, so we log an error instead - let (tx, rx) = oneshot::channel(); - if let Err(err) = self - .actor - .send_msg(ChainPubsubActorMessage::RecycleConnections { - response: tx, - }) - .await - { - error!( - "ChainPubsubClientImpl::recycle_connections: failed to send RecycleConnections: {err:?}" - ); - return; - } - let res = match rx.await { - Ok(r) => r, - Err(err) => { - error!( - "ChainPubsubClientImpl::recycle_connections: actor dropped recycle ack: {err:?}" - ); - return; - } - }; - if let Err(err) = res { - error!( - "ChainPubsubClientImpl::recycle_connections: recycle failed: {err:?}" - ); - } - } - fn take_updates(&self) -> mpsc::Receiver { // SAFETY: This can only be None if `take_updates` is called more than // once (double-take). That indicates a logic bug in the calling code. @@ -117,7 +210,10 @@ impl ChainPubsubClient for ChainPubsubClientImpl { }) .await?; - rx.await? + rx.await + .inspect_err(|err| { + warn!("ChainPubsubClientImpl::subscribe - RecvError occurred while awaiting subscription response for {}: {err:?}. This indicates the actor sender was dropped without responding.", pubkey); + })? } async fn unsubscribe( @@ -132,7 +228,53 @@ impl ChainPubsubClient for ChainPubsubClientImpl { }) .await?; - rx.await? + rx.await + .inspect_err(|err| { + warn!("ChainPubsubClientImpl::unsubscribe - RecvError occurred while awaiting unsubscription response for {}: {err:?}. This indicates the actor sender was dropped without responding.", pubkey); + })? + } + + async fn subscription_count( + &self, + exclude: Option<&[Pubkey]>, + ) -> (usize, usize) { + let total = self.actor.subscription_count(&[]); + let filtered = if let Some(exclude) = exclude { + self.actor.subscription_count(exclude) + } else { + total + }; + (total, filtered) + } + + fn subscriptions(&self) -> Vec { + self.actor.subscriptions() + } +} + +#[async_trait] +impl ReconnectableClient for ChainPubsubClientImpl { + async fn try_reconnect(&self) -> RemoteAccountProviderResult<()> { + let (tx, rx) = oneshot::channel(); + self.actor + .send_msg(ChainPubsubActorMessage::Reconnect { response: tx }) + .await?; + + rx.await.inspect_err(|err| { + warn!("RecvError occurred while awaiting reconnect response: {err:?}."); + })? + } + + async fn resub_multiple( + &self, + pubkeys: &[Pubkey], + ) -> RemoteAccountProviderResult<()> { + for &pubkey in pubkeys { + self.subscribe(pubkey).await?; + // Don't spam the RPC provider - for 5,000 accounts we would take 250 secs = ~4 minutes + tokio::time::sleep(Duration::from_millis(50)).await; + } + Ok(()) } } @@ -141,13 +283,7 @@ impl ChainPubsubClient for ChainPubsubClientImpl { // ----------------- #[cfg(any(test, feature = "dev-context"))] pub mod mock { - use std::{ - collections::HashSet, - sync::{ - atomic::{AtomicU64, Ordering}, - Mutex, - }, - }; + use std::{collections::HashSet, sync::Mutex, time::Duration}; use log::*; use solana_account::Account; @@ -158,13 +294,17 @@ pub mod mock { use solana_sdk::clock::Slot; use super::*; + use crate::remote_account_provider::{ + RemoteAccountProviderError, RemoteAccountProviderResult, + }; #[derive(Clone)] pub struct ChainPubsubClientMock { updates_sndr: mpsc::Sender, updates_rcvr: Arc>>>, subscribed_pubkeys: Arc>>, - recycle_calls: Arc, + connected: Arc>, + pending_resubscribe_failures: Arc>, } impl ChainPubsubClientMock { @@ -176,12 +316,20 @@ pub mod mock { updates_sndr, updates_rcvr: Arc::new(Mutex::new(Some(updates_rcvr))), subscribed_pubkeys: Arc::new(Mutex::new(HashSet::new())), - recycle_calls: Arc::new(AtomicU64::new(0)), + connected: Arc::new(Mutex::new(true)), + pending_resubscribe_failures: Arc::new(Mutex::new(0)), } } - pub fn recycle_calls(&self) -> u64 { - self.recycle_calls.load(Ordering::SeqCst) + /// Simulate a disconnect: clear all subscriptions and mark client as disconnected. + pub fn simulate_disconnect(&self) { + *self.connected.lock().unwrap() = false; + self.subscribed_pubkeys.lock().unwrap().clear(); + } + + /// Fail the next N resubscription attempts in resub_multiple(). + pub fn fail_next_resubscriptions(&self, n: usize) { + *self.pending_resubscribe_failures.lock().unwrap() = n; } async fn send(&self, update: SubscriptionUpdate) { @@ -225,10 +373,6 @@ pub mod mock { #[async_trait] impl ChainPubsubClient for ChainPubsubClientMock { - async fn recycle_connections(&self) { - self.recycle_calls.fetch_add(1, Ordering::SeqCst); - } - fn take_updates(&self) -> mpsc::Receiver { // SAFETY: This can only be None if `take_updates` is called more // than once (double take). That would indicate a logic bug in the @@ -242,6 +386,13 @@ pub mod mock { &self, pubkey: Pubkey, ) -> RemoteAccountProviderResult<()> { + if !*self.connected.lock().unwrap() { + return Err( + RemoteAccountProviderError::AccountSubscriptionsFailed( + "mock: subscribe while disconnected".to_string(), + ), + ); + } let mut subscribed_pubkeys = self.subscribed_pubkeys.lock().unwrap(); subscribed_pubkeys.insert(pubkey); @@ -259,5 +410,60 @@ pub mod mock { } async fn shutdown(&self) {} + + async fn subscription_count( + &self, + exclude: Option<&[Pubkey]>, + ) -> (usize, usize) { + let pubkeys: Vec = { + let subs = self.subscribed_pubkeys.lock().unwrap(); + subs.iter().cloned().collect() + }; + let total = pubkeys.len(); + let exclude = exclude.unwrap_or_default(); + let filtered = pubkeys + .iter() + .filter(|pubkey| !exclude.contains(pubkey)) + .count(); + (total, filtered) + } + + fn subscriptions(&self) -> Vec { + let subs = self.subscribed_pubkeys.lock().unwrap(); + subs.iter().copied().collect() + } + } + + #[async_trait] + impl ReconnectableClient for ChainPubsubClientMock { + async fn try_reconnect(&self) -> RemoteAccountProviderResult<()> { + *self.connected.lock().unwrap() = true; + Ok(()) + } + + async fn resub_multiple( + &self, + pubkeys: &[Pubkey], + ) -> RemoteAccountProviderResult<()> { + // Simulate transient resubscription failures + { + let mut to_fail = + self.pending_resubscribe_failures.lock().unwrap(); + if *to_fail > 0 { + *to_fail -= 1; + return Err( + RemoteAccountProviderError::AccountSubscriptionsFailed( + "mock: forced resubscribe failure".to_string(), + ), + ); + } + } + for &pubkey in pubkeys { + self.subscribe(pubkey).await?; + // keep it small; tests shouldn't take long + tokio::time::sleep(Duration::from_millis(10)).await; + } + Ok(()) + } } } diff --git a/magicblock-chainlink/src/remote_account_provider/config.rs b/magicblock-chainlink/src/remote_account_provider/config.rs index be2aa0f1a..98f063df1 100644 --- a/magicblock-chainlink/src/remote_account_provider/config.rs +++ b/magicblock-chainlink/src/remote_account_provider/config.rs @@ -9,12 +9,25 @@ pub const DEFAULT_SUBSCRIBED_ACCOUNTS_LRU_CAPACITY: usize = 10_000; pub struct RemoteAccountProviderConfig { subscribed_accounts_lru_capacity: usize, lifecycle_mode: LifecycleMode, + enable_subscription_metrics: bool, } impl RemoteAccountProviderConfig { pub fn try_new( subscribed_accounts_lru_capacity: usize, lifecycle_mode: LifecycleMode, + ) -> RemoteAccountProviderResult { + Self::try_new_with_metrics( + subscribed_accounts_lru_capacity, + lifecycle_mode, + true, + ) + } + + pub fn try_new_with_metrics( + subscribed_accounts_lru_capacity: usize, + lifecycle_mode: LifecycleMode, + enable_subscription_metrics: bool, ) -> RemoteAccountProviderResult { if subscribed_accounts_lru_capacity == 0 { return Err(RemoteAccountProviderError::InvalidLruCapacity( @@ -24,6 +37,7 @@ impl RemoteAccountProviderConfig { Ok(Self { subscribed_accounts_lru_capacity, lifecycle_mode, + enable_subscription_metrics, }) } @@ -41,6 +55,10 @@ impl RemoteAccountProviderConfig { pub fn subscribed_accounts_lru_capacity(&self) -> usize { self.subscribed_accounts_lru_capacity } + + pub fn enable_subscription_metrics(&self) -> bool { + self.enable_subscription_metrics + } } impl Default for RemoteAccountProviderConfig { @@ -49,6 +67,7 @@ impl Default for RemoteAccountProviderConfig { subscribed_accounts_lru_capacity: DEFAULT_SUBSCRIBED_ACCOUNTS_LRU_CAPACITY, lifecycle_mode: LifecycleMode::default(), + enable_subscription_metrics: true, } } } diff --git a/magicblock-chainlink/src/remote_account_provider/lru_cache.rs b/magicblock-chainlink/src/remote_account_provider/lru_cache.rs index 6143026b2..1c96b9022 100644 --- a/magicblock-chainlink/src/remote_account_provider/lru_cache.rs +++ b/magicblock-chainlink/src/remote_account_provider/lru_cache.rs @@ -113,6 +113,26 @@ impl AccountsLruCache { false } } + + pub fn len(&self) -> usize { + let subs = self + .subscribed_accounts + .lock() + .expect("subscribed_accounts lock poisoned"); + subs.len() + } + + pub fn never_evicted_accounts(&self) -> Vec { + self.accounts_to_never_evict.iter().cloned().collect() + } + + pub fn pubkeys(&self) -> Vec { + let subs = self + .subscribed_accounts + .lock() + .expect("subscribed_accounts lock poisoned"); + subs.iter().map(|(k, _)| *k).collect() + } } #[cfg(test)] @@ -237,4 +257,14 @@ mod tests { assert_eq!(evicted, Some(expected_evicted)); } } + + #[test] + fn test_never_evicted_accounts() { + let capacity = NonZeroUsize::new(3).unwrap(); + let cache = AccountsLruCache::new(capacity); + + let never_evicted = cache.never_evicted_accounts(); + // Should contain at least the clock sysvar + assert!(never_evicted.contains(&sysvar::clock::id())); + } } diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 2daef9c1b..a93692181 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -1,11 +1,10 @@ use std::{ - collections::HashMap, + collections::{hash_map::Entry, HashMap, HashSet}, num::NonZeroUsize, sync::{ atomic::{AtomicU64, Ordering}, Arc, Mutex, }, - time::Duration, }; pub(crate) use chain_pubsub_client::{ @@ -33,7 +32,8 @@ use solana_rpc_client_api::{ use solana_sdk::{commitment_config::CommitmentConfig, sysvar::clock}; use tokio::{ sync::{mpsc, oneshot}, - task::{self, JoinSet}, + task, + time::{self, Duration}, }; pub(crate) mod chain_pubsub_actor; @@ -46,14 +46,17 @@ pub mod program_account; mod remote_account; pub use chain_pubsub_actor::SubscriptionUpdate; +use magicblock_metrics::metrics::set_monitored_accounts_count; pub use remote_account::{ResolvedAccount, ResolvedAccountSharedData}; use crate::{errors::ChainlinkResult, submux::SubMuxClient}; -// Simple tracking for accounts currently being fetched to handle race conditions +const ACTIVE_SUBSCRIPTIONS_UPDATE_INTERVAL_MS: u64 = 60_000; + // Maps pubkey -> (fetch_start_slot, requests_waiting) +type FetchResult = Result; type FetchingAccounts = - Mutex>)>>; + Mutex>)>>; pub struct ForwardedSubscriptionUpdate { pub pubkey: Pubkey, @@ -84,7 +87,7 @@ pub struct RemoteAccountProvider { received_updates_count: Arc, /// Tracks which accounts are currently subscribed to - subscribed_accounts: AccountsLruCache, + lrucache_subscribed_accounts: Arc, /// Channel to notify when an account is removed from the cache and thus no /// longer being watched @@ -94,6 +97,9 @@ pub struct RemoteAccountProvider { removed_account_rx: Mutex>>, subscription_forwarder: Arc>, + + /// Task that periodically updates the active subscriptions gauge + _active_subscriptions_task_handle: Option>, } // ----------------- @@ -184,6 +190,79 @@ impl RemoteAccountProvider { Ok(None) } } + + /// Creates a background task that periodically updates the active subscriptions gauge + fn start_active_subscriptions_updater( + subscribed_accounts: Arc, + pubsub_client: Arc, + ) -> task::JoinHandle<()> { + task::spawn(async move { + let mut interval = time::interval(Duration::from_millis( + ACTIVE_SUBSCRIPTIONS_UPDATE_INTERVAL_MS, + )); + let never_evicted = subscribed_accounts.never_evicted_accounts(); + + loop { + interval.tick().await; + let lru_count = subscribed_accounts.len(); + let (pubsub_total, pubsub_without_never_evict) = pubsub_client + .subscription_count(Some(&never_evicted)) + .await; + + let all_pubsub_subs = if log::log_enabled!(log::Level::Debug) { + pubsub_client.subscriptions() + } else { + vec![] + }; + if lru_count != pubsub_without_never_evict { + warn!( + "User account subscription counts LRU cache={} pubsub client={} don't match", + lru_count, pubsub_without_never_evict + ); + if log::log_enabled!(log::Level::Debug) { + // Log all pubsub subscriptions for debugging + trace!( + "All pubsub subscriptions: {:?}", + all_pubsub_subs + ); + + // Find extra keys in pubsub that are not in LRU cache + let lru_pubkeys = subscribed_accounts.pubkeys(); + let pubsub_subs_without_never_evict: HashSet<_> = + all_pubsub_subs + .iter() + .filter(|pk| !never_evicted.contains(pk)) + .copied() + .collect(); + let lru_pubkeys_set: HashSet<_> = + lru_pubkeys.into_iter().collect(); + + let extra_in_pubsub: Vec<_> = + pubsub_subs_without_never_evict + .difference(&lru_pubkeys_set) + .cloned() + .collect(); + let extra_in_lru: Vec<_> = lru_pubkeys_set + .difference(&pubsub_subs_without_never_evict) + .cloned() + .collect(); + + if !extra_in_pubsub.is_empty() { + debug!("Extra pubkeys in pubsub client not in LRU cache: {:?}", extra_in_pubsub); + } + if !extra_in_lru.is_empty() { + debug!("Extra pubkeys in LRU cache not in pubsub client: {:?}", extra_in_lru); + } + } + } + + debug!("Updating active subscriptions: count={}", pubsub_total); + trace!("All subscriptions: {}", pubkeys_str(&all_pubsub_subs)); + set_monitored_accounts_count(pubsub_total); + } + }) + } + /// Creates a new instance of the remote account provider /// By the time this method returns the current chain slot was resolved and /// a subscription setup to keep it up to date. @@ -195,6 +274,24 @@ impl RemoteAccountProvider { ) -> RemoteAccountProviderResult { let (removed_account_tx, removed_account_rx) = tokio::sync::mpsc::channel(100); + let subscribed_accounts = Arc::new(AccountsLruCache::new({ + // SAFETY: NonZeroUsize::new only returns None if the value is 0. + // RemoteAccountProviderConfig can only be constructed with + // capacity > 0 + let cap = config.subscribed_accounts_lru_capacity(); + NonZeroUsize::new(cap).expect("non-zero capacity") + })); + + let active_subscriptions_updater = + if config.enable_subscription_metrics() { + Some(Self::start_active_subscriptions_updater( + subscribed_accounts.clone(), + Arc::new(pubsub_client.clone()), + )) + } else { + None + }; + let me = Self { fetching_accounts: Arc::::default(), rpc_client, @@ -202,16 +299,11 @@ impl RemoteAccountProvider { chain_slot: Arc::::default(), last_update_slot: Arc::::default(), received_updates_count: Arc::::default(), - subscribed_accounts: AccountsLruCache::new({ - // SAFETY: NonZeroUsize::new only returns None if the value is 0. - // RemoteAccountProviderConfig can only be constructed with - // capacity > 0 - let cap = config.subscribed_accounts_lru_capacity(); - NonZeroUsize::new(cap).expect("non-zero capacity") - }), + lrucache_subscribed_accounts: subscribed_accounts.clone(), subscription_forwarder: Arc::new(subscription_forwarder), removed_account_tx, removed_account_rx: Mutex::new(Some(removed_account_rx)), + _active_subscriptions_task_handle: active_subscriptions_updater, }; let updates = me.pubsub_client.take_updates(); @@ -257,15 +349,17 @@ impl RemoteAccountProvider { }; // Build pubsub clients and wrap them into a SubMuxClient - let mut pubsubs: Vec> = + let mut pubsubs: Vec<(Arc, mpsc::Receiver<()>)> = Vec::with_capacity(endpoints.len()); for ep in endpoints { + let (abort_tx, abort_rx) = mpsc::channel(1); let client = ChainPubsubClientImpl::try_new_from_url( ep.pubsub_url.as_str(), + abort_tx, commitment, ) .await?; - pubsubs.push(Arc::new(client)); + pubsubs.push((Arc::new(client), abort_rx)); } let submux = SubMuxClient::new(pubsubs, None); @@ -277,7 +371,7 @@ impl RemoteAccountProvider { } pub(crate) fn promote_accounts(&self, pubkeys: &[&Pubkey]) { - self.subscribed_accounts.promote_multi(pubkeys); + self.lrucache_subscribed_accounts.promote_multi(pubkeys); } pub fn try_get_removed_account_rx( @@ -361,7 +455,8 @@ impl RemoteAccountProvider { // Resolve all pending requests with subscription data for sender in pending_requests { - let _ = sender.send(remote_account.clone()); + let _ = + sender.send(Ok(remote_account.clone())); } None } else { @@ -518,8 +613,8 @@ impl RemoteAccountProvider { return Ok(vec![]); } - if log_enabled!(log::Level::Debug) { - debug!("Fetching accounts: [{}]", pubkeys_str(pubkeys)); + if log_enabled!(log::Level::Trace) { + trace!("Fetching accounts: [{}]", pubkeys_str(pubkeys)); } // Create channels for potential subscription updates to override fetch results @@ -530,7 +625,14 @@ impl RemoteAccountProvider { let mut fetching = self.fetching_accounts.lock().unwrap(); for &pubkey in pubkeys { let (sender, receiver) = oneshot::channel(); - fetching.insert(pubkey, (fetch_start_slot, vec![sender])); + match fetching.entry(pubkey) { + Entry::Occupied(mut entry) => { + entry.get_mut().1.push(sender); + } + Entry::Vacant(entry) => { + entry.insert((fetch_start_slot, vec![sender])); + } + } subscription_overrides.push((pubkey, receiver)); } } @@ -550,10 +652,23 @@ impl RemoteAccountProvider { subscription_overrides.into_iter().enumerate() { match receiver.await { - Ok(remote_account) => resolved_accounts.push(remote_account), + Ok(result) => match result { + Ok(remote_account) => { + resolved_accounts.push(remote_account) + } + Err(err) => { + error!("Failed to fetch account {pubkey}: {err}"); + errors.push((idx, err)); + } + }, Err(err) => { + warn!("RemoteAccountProvider::try_get_multi - Unexpected RecvError while awaiting account {pubkey} at index {idx}: {err:?}. This should not happen with Result-based channels. Context: fetch_start_slot={fetch_start_slot}, min_context_slot={min_context_slot}, total_pubkeys={}", + pubkeys.len()); error!("Failed to resolve account {pubkey}: {err:?}"); - errors.push((idx, err)); + errors.push(( + idx, + RemoteAccountProviderError::RecvrError(err), + )); } } } @@ -586,7 +701,7 @@ impl RemoteAccountProvider { async fn setup_subscriptions( &self, - subscribe_and_fetch: &[(Pubkey, oneshot::Receiver)], + subscribe_and_fetch: &[(Pubkey, oneshot::Receiver)], ) -> RemoteAccountProviderResult<()> { if log_enabled!(log::Level::Debug) { let pubkeys = subscribe_and_fetch @@ -594,58 +709,13 @@ impl RemoteAccountProvider { .map(|(pk, _)| pk.to_string()) .collect::>() .join(", "); - debug!("Subscribing to accounts: {pubkeys}"); + trace!("Subscribing to accounts: {pubkeys}"); } - let subscription_results = { - let mut set = JoinSet::new(); - for (pubkey, _) in subscribe_and_fetch.iter() { - let pc = self.pubsub_client.clone(); - let pubkey = *pubkey; - set.spawn(async move { pc.subscribe(pubkey).await }); - } - set - } - .join_all() - .await; - - let (new_subs, errs) = subscription_results - .into_iter() - .enumerate() - .fold((vec![], vec![]), |(mut new_subs, mut errs), (idx, res)| { - match res { - Ok(_) => { - if let Some((pubkey, _)) = subscribe_and_fetch.get(idx) - { - new_subs.push(pubkey); - } - } - Err(err) => errs.push((idx, err)), - } - (new_subs, errs) - }); - - if errs.is_empty() { - for pubkey in new_subs { - // Register the subscription for the pubkey - self.register_subscription(pubkey).await?; - } - Ok(()) - } else { - Err(RemoteAccountProviderError::AccountSubscriptionsFailed( - errs.iter() - .map(|(idx, err)| { - let pubkey = subscribe_and_fetch - .get(*idx) - .map(|(pk, _)| pk.to_string()) - .unwrap_or_else(|| { - "BUG: could not match pubkey".to_string() - }); - format!("{pubkey}: {err:?}") - }) - .collect::>() - .join(",\n"), - )) + for (pubkey, _) in subscribe_and_fetch.iter() { + // Register the subscription for the pubkey (handles LRU cache and eviction first) + self.register_subscription(pubkey).await?; } + Ok(()) } /// Registers a new subscription for the given pubkey. @@ -655,15 +725,29 @@ impl RemoteAccountProvider { ) -> RemoteAccountProviderResult<()> { // If an account is evicted then we need to unsubscribe from it first // and then inform upstream that we are no longer tracking it - if let Some(evicted) = self.subscribed_accounts.add(*pubkey) { + if let Some(evicted) = self.lrucache_subscribed_accounts.add(*pubkey) { trace!("Evicting {pubkey}"); - // 1. Unsubscribe from the account - self.unsubscribe(&evicted).await?; + // 1. Unsubscribe from the account directly (LRU has already removed it) + if let Err(err) = self.pubsub_client.unsubscribe(evicted).await { + warn!( + "Failed to unsubscribe from pubsub for evicted account {evicted}: {err:?}"); + // Rollback the LRU add since eviction failed + self.lrucache_subscribed_accounts.remove(pubkey); + return Err(err); + } // 2. Inform upstream so it can remove it from the store self.send_removal_update(evicted).await?; } + + // 3. Subscribe to the new account (only after successful eviction handling) + if let Err(err) = self.pubsub_client.subscribe(*pubkey).await { + // Rollback the LRU add since subscription failed + self.lrucache_subscribed_accounts.remove(pubkey); + return Err(err); + } + Ok(()) } @@ -681,7 +765,7 @@ impl RemoteAccountProvider { /// This does not consider accounts like the clock sysvar that are watched as /// part of the provider's internal logic. pub fn is_watching(&self, pubkey: &Pubkey) -> bool { - self.subscribed_accounts.contains(pubkey) + self.lrucache_subscribed_accounts.contains(pubkey) } /// Check if an account is currently pending (being fetched) @@ -699,9 +783,7 @@ impl RemoteAccountProvider { return Ok(()); } - self.subscribed_accounts.add(*pubkey); - self.pubsub_client.subscribe(*pubkey).await?; - + self.register_subscription(pubkey).await?; Ok(()) } @@ -710,10 +792,27 @@ impl RemoteAccountProvider { &self, pubkey: &Pubkey, ) -> RemoteAccountProviderResult<()> { - // Only maintain subscriptions if we were actually subscribed - if self.subscribed_accounts.remove(pubkey) { - self.pubsub_client.unsubscribe(*pubkey).await?; - self.send_removal_update(*pubkey).await?; + if !self.lrucache_subscribed_accounts.contains(pubkey) { + warn!( + "Tried to unsubscribe from account {} that was not subscribed in the LRU cache", + pubkey + ); + return Ok(()); + } + + match self.pubsub_client.unsubscribe(*pubkey).await { + Ok(()) => { + // Only remove from LRU cache after successful pubsub unsubscribe + self.lrucache_subscribed_accounts.remove(pubkey); + self.send_removal_update(*pubkey).await?; + } + Err(err) => { + warn!( + "Failed to unsubscribe from pubsub for {pubkey}: {err:?}" + ); + // Don't remove from LRU cache if pubsub unsubscribe failed + // This ensures LRU cache and pubsub client stay in sync + } } Ok(()) @@ -732,19 +831,6 @@ impl RemoteAccountProvider { min_context_slot: u64, ) { const MAX_RETRIES: u64 = 10; - let mut remaining_retries: u64 = MAX_RETRIES; - macro_rules! retry { - ($msg:expr) => { - trace!($msg); - remaining_retries -= 1; - if remaining_retries <= 0 { - error!("Max retries {MAX_RETRIES} reached, giving up on fetching accounts: {pubkeys:?}"); - return; - } - tokio::time::sleep(Duration::from_millis(400)).await; - continue; - } - } let rpc_client = self.rpc_client.clone(); let fetching_accounts = self.fetching_accounts.clone(); @@ -754,10 +840,42 @@ impl RemoteAccountProvider { tokio::spawn(async move { use RemoteAccount::*; - if log_enabled!(log::Level::Debug) { - debug!("Fetch ({})", pubkeys_str(&pubkeys)); + // Helper to notify all pending requests of fetch failure + let notify_error = |error_msg: &str| { + let mut fetching = fetching_accounts.lock().unwrap(); + error!("{error_msg}"); + for pubkey in &pubkeys { + // Remove pending requests and send error + if let Some((_, requests)) = fetching.remove(pubkey) { + for sender in requests { + let error = RemoteAccountProviderError::AccountResolutionsFailed( + format!("{}: {}", pubkey, error_msg) + ); + let _ = sender.send(Err(error)); + } + } + } + }; + + let mut remaining_retries: u64 = MAX_RETRIES; + + if log_enabled!(log::Level::Trace) { + trace!("Fetch ({})", pubkeys_str(&pubkeys)); } + macro_rules! retry { + ($msg:expr) => {{ + trace!($msg); + remaining_retries -= 1; + if remaining_retries <= 0 { + let err_msg = format!("Max retries {MAX_RETRIES} reached, giving up on fetching accounts: {pubkeys:?}"); + notify_error(&err_msg); + return; + } + tokio::time::sleep(Duration::from_millis(400)).await; + continue; + }}; + } let response = loop { // We provide the min_context slot in order to _force_ the RPC to update // its account cache. Otherwise we could just keep fetching the accounts @@ -815,25 +933,28 @@ impl RemoteAccountProvider { message, data, }; - error!( + let err_msg = format!( "RpcError fetching accounts {}: {err:?}", pubkeys_str(&pubkeys) ); + notify_error(&err_msg); return; } } err => { - error!( + let err_msg = format!( "RpcError fetching accounts {}: {err:?}", pubkeys_str(&pubkeys) ); + notify_error(&err_msg); return; } } } _ => { - error!( + let err_msg = format!( "RpcError fetching accounts {}: {err:?}", pubkeys_str(&pubkeys) ); + notify_error(&err_msg); return; } }, @@ -903,7 +1024,7 @@ impl RemoteAccountProvider { // Send the fetch result to all waiting requests for request in requests { - let _ = request.send(remote_account.clone()); + let _ = request.send(Ok(remote_account.clone())); } } }); @@ -1005,11 +1126,17 @@ mod test { let pubsub_client = chain_pubsub_client::mock::ChainPubsubClientMock::new(tx, rx); let (fwd_tx, _fwd_rx) = mpsc::channel(100); + let config = RemoteAccountProviderConfig::try_new_with_metrics( + 1000, + LifecycleMode::Ephemeral, + false, + ) + .unwrap(); RemoteAccountProvider::new( rpc_client, pubsub_client, fwd_tx, - &RemoteAccountProviderConfig::default(), + &config, ) .await .unwrap() @@ -1049,11 +1176,18 @@ mod test { ( { let (fwd_tx, _fwd_rx) = mpsc::channel(100); + let config = + RemoteAccountProviderConfig::try_new_with_metrics( + 1000, + LifecycleMode::Ephemeral, + false, + ) + .unwrap(); RemoteAccountProvider::new( rpc_client.clone(), pubsub_client, fwd_tx, - &RemoteAccountProviderConfig::default(), + &config, ) .await .unwrap() @@ -1121,12 +1255,18 @@ mod test { let pubsub_client = ChainPubsubClientMock::new(tx, rx); let (forward_tx, forward_rx) = mpsc::channel(100); + let config = RemoteAccountProviderConfig::try_new_with_metrics( + 1000, + LifecycleMode::Ephemeral, + false, + ) + .unwrap(); ( RemoteAccountProvider::new( rpc_client, pubsub_client, forward_tx, - &RemoteAccountProviderConfig::default(), + &config, ) .await .unwrap(), @@ -1321,9 +1461,10 @@ mod test { rpc_client, pubsub_client, forward_tx, - &RemoteAccountProviderConfig::try_new( + &RemoteAccountProviderConfig::try_new_with_metrics( accounts_capacity, LifecycleMode::Ephemeral, + false, ) .unwrap(), ) diff --git a/magicblock-chainlink/src/submux/mod.rs b/magicblock-chainlink/src/submux/mod.rs index 96ba10318..8c4e1f9d6 100644 --- a/magicblock-chainlink/src/submux/mod.rs +++ b/magicblock-chainlink/src/submux/mod.rs @@ -11,14 +11,14 @@ use solana_pubkey::Pubkey; use tokio::sync::mpsc; use crate::remote_account_provider::{ - chain_pubsub_client::ChainPubsubClient, - errors::RemoteAccountProviderResult, SubscriptionUpdate, + chain_pubsub_client::{ChainPubsubClient, ReconnectableClient}, + errors::RemoteAccountProviderResult, + SubscriptionUpdate, }; const SUBMUX_OUT_CHANNEL_SIZE: usize = 5_000; const DEDUP_WINDOW_MILLIS: u64 = 2_000; const DEBOUNCE_INTERVAL_MILLIS: u64 = 2_000; -const DEFAULT_RECYCLE_INTERVAL_MILLIS: u64 = 3_600_000; mod debounce_state; pub use self::debounce_state::DebounceState; @@ -97,7 +97,10 @@ pub struct DebounceConfig { /// - While waiting for eligibility in Enabled state, only the latest /// observed update is kept as pending so that the consumer receives /// the freshest state when the interval elapses. -pub struct SubMuxClient { +pub struct SubMuxClient +where + T: ChainPubsubClient + ReconnectableClient, +{ /// Underlying pubsub clients this mux controls and forwards to/from. clients: Vec>, /// Aggregated outgoing channel used by forwarder tasks to deliver @@ -128,20 +131,6 @@ pub struct SubMuxClient { never_debounce: HashSet, } -/// Configuration for SubMuxClient -#[derive(Debug, Clone, Default)] -pub struct SubMuxClientConfig { - /// The deduplication window in milliseconds. - pub dedupe_window_millis: Option, - /// The debounce interval in milliseconds. - pub debounce_interval_millis: Option, - /// The debounce detection window in milliseconds. - pub debounce_detection_window_millis: Option, - /// Interval (millis) at which to recycle inner client connections. - /// If None, defaults to DEFAULT_RECYCLE_INTERVAL_MILLIS. - pub recycle_interval_millis: Option, -} - // Parameters for the long-running forwarder loop, grouped to avoid // clippy::too_many_arguments and to keep spawn sites concise. struct ForwarderParams { @@ -154,9 +143,9 @@ struct ForwarderParams { allowed_count: usize, } -impl SubMuxClient { +impl SubMuxClient { pub fn new( - clients: Vec>, + clients: Vec<(Arc, mpsc::Receiver<()>)>, dedupe_window_millis: Option, ) -> Self { Self::new_with_debounce( @@ -169,16 +158,15 @@ impl SubMuxClient { } pub fn new_with_debounce( - clients: Vec>, + clients: Vec<(Arc, mpsc::Receiver<()>)>, config: DebounceConfig, ) -> Self { - Self::new_with_configs(clients, config, SubMuxClientConfig::default()) + Self::new_with_config(clients, config) } - pub fn new_with_configs( - clients: Vec>, + pub fn new_with_config( + clients: Vec<(Arc, mpsc::Receiver<()>)>, config: DebounceConfig, - mux_config: SubMuxClientConfig, ) -> Self { let (out_tx, out_rx) = mpsc::channel(SUBMUX_OUT_CHANNEL_SIZE); let dedup_cache = Arc::new(Mutex::new(HashMap::new())); @@ -197,6 +185,8 @@ impl SubMuxClient { let never_debounce: HashSet = vec![solana_sdk::sysvar::clock::ID].into_iter().collect(); + let clients = Self::spawn_reconnectors(clients); + let me = Self { clients, out_tx, @@ -212,10 +202,95 @@ impl SubMuxClient { // Spawn background tasks me.spawn_dedup_pruner(); me.spawn_debounce_flusher(); - me.maybe_spawn_connection_recycler(mux_config.recycle_interval_millis); me } + // ----------------- + // Reconnection + // ----------------- + fn spawn_reconnectors( + clients: Vec<(Arc, mpsc::Receiver<()>)>, + ) -> Vec> { + let clients_only = clients + .iter() + .map(|(c, _)| c.clone()) + .collect::>>(); + for (client, mut abort_rx) in clients.into_iter() { + let clients_clone = clients_only.clone(); + tokio::spawn(async move { + while abort_rx.recv().await.is_some() { + // Drain any duplicate abort signals to coalesce reconnect attempts + while abort_rx.try_recv().is_ok() {} + + debug!( + "Reconnecter received abort signal, reconnecting client" + ); + Self::reconnect_client_with_backoff( + client.clone(), + clients_clone.clone(), + ) + .await; + } + }); + } + clients_only + } + + async fn reconnect_client_with_backoff( + client: Arc, + all_clients: Vec>, + ) { + fn fib_with_max(n: u64) -> u64 { + let (mut a, mut b) = (0u64, 1u64); + for _ in 0..n { + (a, b) = (b, a.saturating_add(b)); + } + a.min(600) + } + + const WARN_EVERY_ATTEMPTS: u64 = 10; + let mut attempt = 0; + loop { + attempt += 1; + if Self::reconnect_client(client.clone(), &all_clients).await { + debug!( + "Successfully reconnected client after {} attempts", + attempt + ); + break; + } else { + if attempt % WARN_EVERY_ATTEMPTS == 0 { + error!("Failed to reconnect ({}) times", attempt); + } + let wait_duration = Duration::from_secs(fib_with_max(attempt)); + tokio::time::sleep(wait_duration).await; + debug!("Reconnect attempt {} failed, will retry", attempt); + } + } + } + + async fn reconnect_client(client: Arc, all_clients: &[Arc]) -> bool { + if let Err(err) = client.try_reconnect().await { + debug!("Failed to reconnect client: {:?}", err); + return false; + } + // Resubscribe all existing subscriptions sourced from still connected clients + // NOTE: that new subscriptions are already received now as well since the + // client marked itself as connected and is no longer blocking subscriptions + // See [ChainPubsubActor::handle_msg] and [ChainPubsubActor::try_reconnect] + let subs = Self::get_subscriptions(all_clients); + match client.resub_multiple(&subs).await { + Err(err) => { + debug!( + "Failed to resubscribe accounts after reconnect: {:?}", + err + ); + false + } + Ok(_) => true, + } + } + fn spawn_dedup_pruner(&self) { let window = self.dedup_window; let cache = self.dedup_cache.clone(); @@ -277,34 +352,6 @@ impl SubMuxClient { }); } - fn maybe_spawn_connection_recycler( - &self, - recycle_interval_millis: Option, - ) { - // Disabled when the interval is explicitly Some(0) - if recycle_interval_millis == Some(0) { - return; - } - let recycle_clients = self.clients.clone(); - let interval = Duration::from_millis( - recycle_interval_millis.unwrap_or(DEFAULT_RECYCLE_INTERVAL_MILLIS), - ); - tokio::spawn(async move { - let mut idx: usize = 0; - loop { - tokio::time::sleep(interval).await; - if recycle_clients.is_empty() { - continue; - } - let len = recycle_clients.len(); - let i = idx % len; - idx = (idx + 1) % len; - let client = recycle_clients[i].clone(); - client.recycle_connections().await; - } - }); - } - fn start_forwarders(&self) { let window = self.dedup_window; let debounce_interval = self.debounce_interval; @@ -499,6 +546,14 @@ impl SubMuxClient { maybe_forward_now } + fn get_subscriptions(clients: &[Arc]) -> Vec { + let mut all_subs = HashSet::new(); + for client in clients { + all_subs.extend(client.subscriptions()); + } + all_subs.into_iter().collect() + } + fn allowed_in_debounce_window_count(&self) -> usize { (self.debounce_detection_window.as_millis() / self.debounce_interval.as_millis()) as usize @@ -515,15 +570,10 @@ impl SubMuxClient { } #[async_trait] -impl ChainPubsubClient for SubMuxClient { - async fn recycle_connections(&self) { - // This recycles all inner clients which may not always make - // sense. Thus we don't expect this call on the Multiplexer itself. - for client in &self.clients { - client.recycle_connections().await; - } - } - +impl ChainPubsubClient for SubMuxClient +where + T: ChainPubsubClient + ReconnectableClient, +{ async fn subscribe( &self, pubkey: Pubkey, @@ -563,6 +613,34 @@ impl ChainPubsubClient for SubMuxClient { self.start_forwarders(); out_rx } + + /// Gets the maximum subscription count across all inner clients. + /// NOTE: one of the clients could be reconnecting and thus + /// temporarily have fewer or no subscriptions + async fn subscription_count( + &self, + exclude: Option<&[Pubkey]>, + ) -> (usize, usize) { + let mut max_total = 0; + let mut max_filtered = 0; + for client in &self.clients { + let (total, filtered) = client.subscription_count(exclude).await; + if total > max_total { + max_total = total; + } + if filtered > max_filtered { + max_filtered = filtered; + } + } + (max_total, max_filtered) + } + + /// Gets the union of all subscriptions across all inner clients. + /// Unless one is reconnecting, this should be identical to + /// getting it from a single inner client. + fn subscriptions(&self) -> Vec { + Self::get_subscriptions(&self.clients) + } } #[cfg(test)] @@ -582,6 +660,53 @@ mod tests { ..Account::default() } } + fn new_submux_client( + clients: Vec>, + dedupe_window_millis: Option, + ) -> SubMuxClient { + let client_tuples = clients + .into_iter() + .map(|c| { + let (_abort_tx, abort_rx) = mpsc::channel(1); + (c, abort_rx) + }) + .collect(); + SubMuxClient::new(client_tuples, dedupe_window_millis) + } + + fn new_submux_client_with_debounce( + clients: Vec>, + config: DebounceConfig, + ) -> SubMuxClient { + let client_tuples = clients + .into_iter() + .map(|c| { + let (_abort_tx, abort_rx) = mpsc::channel(1); + (c, abort_rx) + }) + .collect(); + SubMuxClient::new_with_debounce(client_tuples, config) + } + + fn new_submux_with_abort( + clients: Vec>, + dedupe_window_millis: Option, + ) -> (SubMuxClient, Vec>) { + let mut abort_senders = Vec::new(); + let client_tuples = clients + .into_iter() + .map(|c| { + let (abort_tx, abort_rx) = mpsc::channel(4); + abort_senders.push(abort_tx); + (c, abort_rx) + }) + .collect(); + ( + SubMuxClient::new(client_tuples, dedupe_window_millis), + abort_senders, + ) + } + // ----------------- // Subscribe/Unsubscribe // ----------------- @@ -595,7 +720,7 @@ mod tests { let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - let mux: SubMuxClient = SubMuxClient::new( + let mux: SubMuxClient = new_submux_client( vec![client1.clone(), client2.clone()], Some(100), ); @@ -648,7 +773,7 @@ mod tests { let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - let mux: SubMuxClient = SubMuxClient::new( + let mux: SubMuxClient = new_submux_client( vec![client1.clone(), client2.clone()], Some(100), ); @@ -695,7 +820,7 @@ mod tests { let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - let mux: SubMuxClient = SubMuxClient::new( + let mux: SubMuxClient = new_submux_client( vec![client1.clone(), client2.clone()], Some(100), ); @@ -756,7 +881,7 @@ mod tests { let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - let mux: SubMuxClient = SubMuxClient::new( + let mux: SubMuxClient = new_submux_client( vec![client1.clone(), client2.clone()], Some(100), ); @@ -819,7 +944,7 @@ mod tests { let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); let client3 = Arc::new(ChainPubsubClientMock::new(tx3, rx3)); - let mux: SubMuxClient = SubMuxClient::new( + let mux: SubMuxClient = new_submux_client( vec![client1.clone(), client2.clone(), client3.clone()], Some(100), ); @@ -949,7 +1074,7 @@ mod tests { let (tx, rx) = mpsc::channel(10_000); let client = Arc::new(ChainPubsubClientMock::new(tx, rx)); let mux: SubMuxClient = - SubMuxClient::new_with_debounce( + new_submux_client_with_debounce( vec![client.clone()], DebounceConfig { dedupe_window_millis: Some(100), @@ -1007,7 +1132,7 @@ mod tests { let (tx, rx) = mpsc::channel(10_000); let client = Arc::new(ChainPubsubClientMock::new(tx, rx)); let mux: SubMuxClient = - SubMuxClient::new_with_debounce( + new_submux_client_with_debounce( vec![client.clone()], DebounceConfig { dedupe_window_millis: Some(100), @@ -1045,7 +1170,7 @@ mod tests { let (tx, rx) = mpsc::channel(10_000); let client = Arc::new(ChainPubsubClientMock::new(tx, rx)); let mux: SubMuxClient = - SubMuxClient::new_with_debounce( + new_submux_client_with_debounce( vec![client.clone()], DebounceConfig { dedupe_window_millis: Some(100), @@ -1103,7 +1228,7 @@ mod tests { let (tx, rx) = mpsc::channel(10_000); let client = Arc::new(ChainPubsubClientMock::new(tx, rx)); let mux: SubMuxClient = - SubMuxClient::new_with_debounce( + new_submux_client_with_debounce( vec![client.clone()], DebounceConfig { dedupe_window_millis: Some(100), @@ -1140,60 +1265,120 @@ mod tests { } // ----------------- - // Connection recycling + // Reconnection Tests // ----------------- - async fn setup_recycling( - interval_millis: Option, - ) -> ( - SubMuxClient, - Arc, - Arc, - Arc, - ) { + #[tokio::test] + async fn test_reconnect_on_disconnect_reestablishes_subscriptions() { init_logger(); - let (tx1, rx1) = mpsc::channel(1); - let (tx2, rx2) = mpsc::channel(1); - let (tx3, rx3) = mpsc::channel(1); - let c1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); - let c2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - let c3 = Arc::new(ChainPubsubClientMock::new(tx3, rx3)); - let mux: SubMuxClient = - SubMuxClient::new_with_configs( - vec![c1.clone(), c2.clone(), c3.clone()], - DebounceConfig::default(), - SubMuxClientConfig { - recycle_interval_millis: interval_millis, - ..SubMuxClientConfig::default() - }, - ); + let (tx1, rx1) = mpsc::channel(10_000); + let (tx2, rx2) = mpsc::channel(10_000); + let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); + let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); - (mux, c1, c2, c3) - } - #[tokio::test] - async fn test_connection_recycling_enabled() { - let (mux, c1, c2, c3) = setup_recycling(Some(50)).await; + let (mux, aborts) = new_submux_with_abort( + vec![client1.clone(), client2.clone()], + Some(100), + ); + let mut mux_rx = mux.take_updates(); + + let pk = Pubkey::new_unique(); + mux.subscribe(pk).await.unwrap(); + + // Baseline: client1 update arrives + client1 + .send_account_update(pk, 1, &account_with_lamports(111)) + .await; + tokio::time::timeout( + std::time::Duration::from_millis(200), + mux_rx.recv(), + ) + .await + .expect("got baseline update") + .expect("stream open"); + + // Simulate disconnect: client1 loses subscriptions and is "disconnected" + client1.simulate_disconnect(); - // allow 4 intervals (at ~50ms each) -> calls: c1,c2,c3,c1 - tokio::time::sleep(Duration::from_millis(220)).await; + // Trigger reconnect via abort channel + aborts[0].send(()).await.expect("abort send"); - assert_eq!(c1.recycle_calls(), 2); - assert_eq!(c2.recycle_calls(), 1); - assert_eq!(c3.recycle_calls(), 1); + // Wait for reconnect to complete + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // After reconnect + resubscribe, client1's updates should be forwarded again + client1 + .send_account_update(pk, 2, &account_with_lamports(222)) + .await; + + let up = tokio::time::timeout( + std::time::Duration::from_secs(1), + mux_rx.recv(), + ) + .await + .expect("expect update after reconnect") + .expect("stream open"); + assert_eq!(up.pubkey, pk); + assert_eq!(up.rpc_response.context.slot, 2); mux.shutdown().await; } #[tokio::test] - async fn test_connection_recycling_disabled() { - let (mux, c1, c2, c3) = setup_recycling(Some(0)).await; + async fn test_reconnect_after_failed_resubscription_eventually_recovers() { + init_logger(); + + let (tx1, rx1) = mpsc::channel(10_000); + let (tx2, rx2) = mpsc::channel(10_000); + let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); + let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); + + let (mux, aborts) = new_submux_with_abort( + vec![client1.clone(), client2.clone()], + Some(100), + ); + let mut mux_rx = mux.take_updates(); - // wait enough time to ensure it would have recycled if enabled - tokio::time::sleep(Duration::from_millis(220)).await; + let pk = Pubkey::new_unique(); + mux.subscribe(pk).await.unwrap(); + + // Prepare: first resubscribe attempt will fail + client1.fail_next_resubscriptions(1); + + // Simulate disconnect: client1 loses subs and is disconnected + client1.simulate_disconnect(); + + // Trigger reconnect; first attempt will fail resub; reconnector will retry after ~1s (fib(1)=1) + aborts[0].send(()).await.expect("abort send"); + + // Send updates until one passes after reconnection and resubscribe succeed + // Keep unique slots to avoid dedupe + let mut slot: u64 = 100; + let deadline = Instant::now() + Duration::from_secs(3); + let mut got = None; + while Instant::now() < deadline { + client1 + .send_account_update( + pk, + slot, + &account_with_lamports(1_000 + slot), + ) + .await; + if let Ok(Some(u)) = tokio::time::timeout( + std::time::Duration::from_millis(200), + mux_rx.recv(), + ) + .await + { + got = Some(u); + break; + } + slot += 1; + } - assert_eq!(c1.recycle_calls(), 0); - assert_eq!(c2.recycle_calls(), 0); - assert_eq!(c3.recycle_calls(), 0); + let up = got.expect("should receive update after retry reconnect"); + assert_eq!(up.pubkey, pk); + assert!(up.rpc_response.context.slot >= 100); mux.shutdown().await; } diff --git a/magicblock-chainlink/src/testing/chain_pubsub.rs b/magicblock-chainlink/src/testing/chain_pubsub.rs index 94f1e8dc7..56a4157d5 100644 --- a/magicblock-chainlink/src/testing/chain_pubsub.rs +++ b/magicblock-chainlink/src/testing/chain_pubsub.rs @@ -16,8 +16,10 @@ pub async fn setup_actor_and_client() -> ( mpsc::Receiver, RpcClient, ) { + let (tx, _) = mpsc::channel(10); let (actor, updates_rx) = ChainPubsubActor::new_from_url( PUBSUB_URL, + tx, CommitmentConfig::confirmed(), ) .await @@ -54,13 +56,13 @@ pub async fn unsubscribe(actor: &ChainPubsubActor, pubkey: Pubkey) { .expect("unsubscribe failed"); } -pub async fn recycle(actor: &ChainPubsubActor) { +pub async fn reconnect(actor: &ChainPubsubActor) { let (tx, rx) = oneshot::channel(); actor - .send_msg(ChainPubsubActorMessage::RecycleConnections { response: tx }) + .send_msg(ChainPubsubActorMessage::Reconnect { response: tx }) .await - .expect("failed to send RecycleConnections message"); + .expect("failed to send Reconnect message"); rx.await - .expect("recycle ack channel dropped") - .expect("recycle failed"); + .expect("reconnect ack channel dropped") + .expect("reconnect failed"); } diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index 09084eed4..826949fe1 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -103,11 +103,6 @@ lazy_static::lazy_static! { "monitored_accounts", "number of undelegated accounts, being monitored via websocket", ).unwrap(); - static ref SUBSCRIPTIONS_COUNT_GAUGE: IntGaugeVec = IntGaugeVec::new( - Opts::new("subscriptions_count", "number of active account subscriptions"), - &["shard"], - ).unwrap(); - static ref EVICTED_ACCOUNTS_COUNT: IntGauge = IntGauge::new( "evicted_accounts", "number of accounts forcefully removed from monitored list and database", ).unwrap(); @@ -238,7 +233,6 @@ pub(crate) fn register() { register!(ACCOUNTS_COUNT_GAUGE); register!(PENDING_ACCOUNT_CLONES_GAUGE); register!(MONITORED_ACCOUNTS_GAUGE); - register!(SUBSCRIPTIONS_COUNT_GAUGE); register!(EVICTED_ACCOUNTS_COUNT); register!(COMMITTOR_INTENTS_BACKLOG_COUNT); register!(COMMITTOR_FAILED_INTENTS_COUNT); @@ -263,12 +257,6 @@ pub fn set_cached_clone_outputs_count(count: usize) { CACHED_CLONE_OUTPUTS_COUNT.set(count as i64); } -pub fn set_subscriptions_count(count: usize, shard: &str) { - SUBSCRIPTIONS_COUNT_GAUGE - .with_label_values(&[shard]) - .set(count as i64); -} - pub fn set_ledger_size(size: u64) { LEDGER_SIZE_GAUGE.set(size as i64); } @@ -329,7 +317,7 @@ pub fn ensure_accounts_end(timer: HistogramTimer) { timer.stop_and_record(); } -pub fn adjust_monitored_accounts_count(count: usize) { +pub fn set_monitored_accounts_count(count: usize) { MONITORED_ACCOUNTS_GAUGE.set(count as i64); } pub fn inc_evicted_accounts_count() { diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 6bbd88098..cf1fce89d 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -7,6 +7,7 @@ use magicblock_core::link::{ }, }; use magicblock_metrics::metrics::FAILED_TRANSACTIONS_COUNT; +use solana_account::ReadableAccount; use solana_pubkey::Pubkey; use solana_svm::{ account_loader::{AccountsBalances, CheckedTransactionDetails}, @@ -285,8 +286,10 @@ impl super::TransactionExecutor { for (pubkey, account) in accounts { // only persist account's update if it was actually modified, ignore - // the rest, even if an account was writeable in the transaction - if !account.is_dirty() { + // the rest, even if an account was writeable in the transaction. We + // also don't persist accounts that are empty, since those are managed + // by the chainlink, and we cannot interfere with its logic here. + if !account.is_dirty() || account.lamports() == 0 { continue; } self.accountsdb.insert_account(pubkey, account); diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 00b1f92e4..b4f67e155 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -2934,6 +2934,8 @@ dependencies = [ "solana-transaction-status", "tempfile", "toml 0.8.23", + "ureq", + "url 2.5.4", ] [[package]] @@ -3611,6 +3613,7 @@ dependencies = [ name = "magicblock-chainlink" version = "0.2.3" dependencies = [ + "arc-swap", "async-trait", "bincode", "env_logger 0.11.8", @@ -3620,6 +3623,7 @@ dependencies = [ "magicblock-core", "magicblock-delegation-program", "magicblock-magic-program-api 0.2.3", + "magicblock-metrics", "serde_json", "solana-account", "solana-account-decoder", @@ -5626,6 +5630,7 @@ version = "0.23.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" dependencies = [ + "log", "once_cell", "ring", "rustls-pki-types", @@ -11186,6 +11191,22 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "ureq" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" +dependencies = [ + "base64 0.22.1", + "flate2", + "log", + "once_cell", + "rustls 0.23.28", + "rustls-pki-types", + "url 2.5.4", + "webpki-roots 0.26.11", +] + [[package]] name = "uriparse" version = "0.6.4" @@ -11450,6 +11471,24 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" diff --git a/test-integration/Cargo.toml b/test-integration/Cargo.toml index 9720dd913..56c9e3260 100644 --- a/test-integration/Cargo.toml +++ b/test-integration/Cargo.toml @@ -97,6 +97,8 @@ test-ledger-restore = { path = "./test-ledger-restore" } test-kit = { path = "../test-kit" } tokio = "1.0" toml = "0.8.13" +ureq = "2.9.6" +url = "2.5.0" [patch.crates-io] # some solana dependencies have solana-storage-proto as dependency diff --git a/test-integration/test-chainlink/src/test_context.rs b/test-integration/test-chainlink/src/test_context.rs index 7c9bbad55..f0082fb49 100644 --- a/test-integration/test-chainlink/src/test_context.rs +++ b/test-integration/test-chainlink/src/test_context.rs @@ -67,14 +67,18 @@ impl TestContext { let faucet_pubkey = Pubkey::new_unique(); let (fetch_cloner, remote_account_provider) = { let (tx, rx) = tokio::sync::mpsc::channel(100); + let config = RemoteAccountProviderConfig::try_new_with_metrics( + 1000, // subscribed_accounts_lru_capacity + lifecycle_mode, + false, // disable subscription metrics + ) + .unwrap(); let remote_account_provider = RemoteAccountProvider::try_from_clients_and_mode( rpc_client.clone(), pubsub_client.clone(), tx, - &RemoteAccountProviderConfig::default_with_lifecycle_mode( - lifecycle_mode, - ), + &config, ) .await; diff --git a/test-integration/test-chainlink/tests/chain_pubsub_actor.rs b/test-integration/test-chainlink/tests/chain_pubsub_actor.rs index 087eab526..66c2b9c08 100644 --- a/test-integration/test-chainlink/tests/chain_pubsub_actor.rs +++ b/test-integration/test-chainlink/tests/chain_pubsub_actor.rs @@ -2,7 +2,7 @@ use magicblock_chainlink::{ remote_account_provider::SubscriptionUpdate, testing::{ chain_pubsub::{ - recycle, setup_actor_and_client, subscribe, unsubscribe, + reconnect, setup_actor_and_client, subscribe, unsubscribe, }, utils::{airdrop, init_logger, random_pubkey}, }, @@ -90,9 +90,16 @@ async fn ixtest_recycle_connections() { .await; // 5. Recycle connections - recycle(&actor).await; + reconnect(&actor).await; - // 6. Airdrop again and ensure we receive the update again + // 6. Airdrop again and ensure we don't yet receive the update + airdrop(&rpc_client, &pubkey, 2_500_000).await; + expect_no_update_for(&mut updates_rx, pubkey, 1500).await; + + // 6. Resubscribe to the account + subscribe(&actor, pubkey).await; + + // 7. Airdrop again and ensure we receive the update again let _second_update = airdrop_and_expect_update( &rpc_client, &mut updates_rx, @@ -144,7 +151,20 @@ async fn ixtest_recycle_connections_multiple_accounts() { unsubscribe(&actor, unsub_pk).await; // Recycle connections - recycle(&actor).await; + reconnect(&actor).await; + + // Airdrop to each and ensure we receiive no updates yet + for &pk in &pks { + airdrop(&rpc_client, &pk, 2_500_000).await; + } + for &pk in &pks { + expect_no_update_for(&mut updates_rx, pk, 1500).await; + } + + // Resubscribe to first three + for &pk in &pks[0..3] { + subscribe(&actor, pk).await; + } // Airdrop to first three and expect updates for &pk in &pks[0..3] { diff --git a/test-integration/test-chainlink/tests/chain_pubsub_client.rs b/test-integration/test-chainlink/tests/chain_pubsub_client.rs index f34c011b4..21ebbcea1 100644 --- a/test-integration/test-chainlink/tests/chain_pubsub_client.rs +++ b/test-integration/test-chainlink/tests/chain_pubsub_client.rs @@ -23,8 +23,10 @@ use tokio::{sync::mpsc, task}; async fn setup() -> (ChainPubsubClientImpl, mpsc::Receiver) { init_logger(); + let (tx, _) = mpsc::channel(10); let client = ChainPubsubClientImpl::try_new_from_url( PUBSUB_URL, + tx, CommitmentConfig::confirmed(), ) .await diff --git a/test-integration/test-chainlink/tests/ix_exceed_capacity.rs b/test-integration/test-chainlink/tests/ix_exceed_capacity.rs index 44c2d69c6..cc76a94c4 100644 --- a/test-integration/test-chainlink/tests/ix_exceed_capacity.rs +++ b/test-integration/test-chainlink/tests/ix_exceed_capacity.rs @@ -11,9 +11,10 @@ async fn setup( pubkeys_len: usize, ) -> (IxtestContext, Vec) { let config = { - let rap_config = RemoteAccountProviderConfig::try_new( + let rap_config = RemoteAccountProviderConfig::try_new_with_metrics( subscribed_accounts_lru_capacity, LifecycleMode::Ephemeral, + false, ) .unwrap(); ChainlinkConfig::new(rap_config) diff --git a/test-integration/test-chainlink/tests/ix_remote_account_provider.rs b/test-integration/test-chainlink/tests/ix_remote_account_provider.rs index 47534ab03..0b1e81833 100644 --- a/test-integration/test-chainlink/tests/ix_remote_account_provider.rs +++ b/test-integration/test-chainlink/tests/ix_remote_account_provider.rs @@ -37,9 +37,12 @@ async fn init_remote_account_provider() -> RemoteAccountProvider< &endpoints, CommitmentConfig::confirmed(), fwd_tx, - &RemoteAccountProviderConfig::default_with_lifecycle_mode( + &RemoteAccountProviderConfig::try_new_with_metrics( + 1000, LifecycleMode::Ephemeral, - ), + false, + ) + .unwrap(), ) .await .unwrap() diff --git a/test-integration/test-cloning/tests/07_subscription_limits.rs b/test-integration/test-cloning/tests/07_subscription_limits.rs new file mode 100644 index 000000000..67f53541f --- /dev/null +++ b/test-integration/test-cloning/tests/07_subscription_limits.rs @@ -0,0 +1,121 @@ +use std::{sync::Arc, time::Duration}; + +use integration_test_tools::IntegrationTestContext; +use log::*; +use solana_sdk::{ + native_token::LAMPORTS_PER_SOL, rent::Rent, signature::Keypair, + signer::Signer, +}; +use test_kit::init_logger; +use tokio::task::JoinSet; + +const NUM_PUBKEYS: usize = 400; +// Half of the accounts are delegated and aren't watched +const EXTRA_MONITORED_ACCOUNTS: usize = NUM_PUBKEYS / 2; +const AIRDROP_CHUNK_SIZE: usize = 100; +// See metrics config in: configs/cloning-conf.ephem.toml +const PORT: u16 = 9000; + +// This test creates a large number of accounts, airdrops to all of them +// and delegates half. +// It then ensures that the subscription count increased as expected. +// Since it will be affected by other tests that trigger subscriptions, +// we only run it in isolation manually. +#[ignore = "Run manually only"] +#[tokio::test(flavor = "multi_thread")] +async fn test_large_number_of_account_subscriptions() { + init_logger!(); + let ctx = Arc::new(IntegrationTestContext::try_new().unwrap()); + + debug!("Generating {NUM_PUBKEYS} keypairs..."); + let keypairs: Vec = + (0..NUM_PUBKEYS).map(|_| Keypair::new()).collect(); + debug!("✅ Generated {NUM_PUBKEYS} keypairs"); + + let rent_exempt_amount = Rent::default().minimum_balance(0); + debug!( + "Airdropping {rent_exempt_amount} lamports to {NUM_PUBKEYS} accounts in chunks of {AIRDROP_CHUNK_SIZE}..." + ); + + let payer_chain = Keypair::new(); + ctx.airdrop_chain(&payer_chain.pubkey(), LAMPORTS_PER_SOL * 10) + .expect("failed to airdrop to payer_chain"); + + let monitored_accounts_before = + ctx.get_monitored_accounts_count(PORT).unwrap(); + let mut total_processed = 0; + for (chunk_idx, chunk) in keypairs.chunks(AIRDROP_CHUNK_SIZE).enumerate() { + let mut join_set = JoinSet::new(); + for (idx, keypair) in chunk.iter().enumerate() { + let keypair = keypair.insecure_clone(); + let payer_chain = payer_chain.insecure_clone(); + let ctx = ctx.clone(); + join_set.spawn(async move { + if idx % 2 == 0 { + ctx.airdrop_chain_and_delegate( + &payer_chain, + &keypair, + rent_exempt_amount, + ) + .expect( + "failed to airdrop and delegate to on-chain account", + ); + } else { + ctx.airdrop_chain(&keypair.pubkey(), rent_exempt_amount) + .expect("failed to airdrop to on-chain account"); + } + }); + } + for _result in join_set.join_all().await { + // spawned task panicked or was cancelled - handled by join_all + } + total_processed += chunk.len(); + + let pubkeys = chunk.iter().map(|kp| kp.pubkey()).collect::>(); + + trace!( + "Pubkeys in chunk {}: {}", + chunk_idx + 1, + pubkeys + .iter() + .map(|k| k.to_string()) + .collect::>() + .join(", ") + ); + + debug!( + "✅ Airdropped batch {}: {}/{} accounts ({} total)", + chunk_idx + 1, + chunk.len(), + AIRDROP_CHUNK_SIZE, + total_processed + ); + + let _accounts = ctx + .fetch_ephem_multiple_accounts(&pubkeys) + .expect("failed to fetch accounts"); + + debug!( + "✅ Fetched batch {}: {}/{} accounts ({} total)", + chunk_idx + 1, + chunk.len(), + AIRDROP_CHUNK_SIZE, + total_processed + ); + } + + debug!("✅ Airdropped and fetched all {NUM_PUBKEYS} accounts from ephemeral RPC"); + + // Wait for metrics update + tokio::time::sleep(Duration::from_secs(5)).await; + + let monitored_accounts_after = + ctx.get_monitored_accounts_count(PORT).unwrap(); + let diff = monitored_accounts_after - monitored_accounts_before; + debug!("Monitored accounts count total: {monitored_accounts_after}, diff: {diff}"); + + assert_eq!( + diff, EXTRA_MONITORED_ACCOUNTS, + "Expected monitored accounts to increase by {EXTRA_MONITORED_ACCOUNTS}" + ); +} diff --git a/test-integration/test-tools/Cargo.toml b/test-integration/test-tools/Cargo.toml index 0f9d4524c..75ea22b36 100644 --- a/test-integration/test-tools/Cargo.toml +++ b/test-integration/test-tools/Cargo.toml @@ -11,6 +11,8 @@ log = { workspace = true } random-port = { workspace = true } rayon = { workspace = true } serde = { workspace = true } +ureq = { workspace = true } +url = { workspace = true } magicblock-core = { workspace = true } magicblock-config = { workspace = true } magicblock-delegation-program = { workspace = true, features = [ diff --git a/test-integration/test-tools/src/integration_test_context.rs b/test-integration/test-tools/src/integration_test_context.rs index f31287102..48ae3a911 100644 --- a/test-integration/test-tools/src/integration_test_context.rs +++ b/test-integration/test-tools/src/integration_test_context.rs @@ -29,6 +29,7 @@ use solana_transaction_status::{ EncodedConfirmedBlock, EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding, }; +use url::Url; use crate::{ dlp_interface, @@ -1148,4 +1149,57 @@ impl IntegrationTestContext { pub fn ws_url_chain() -> &'static str { WS_URL_CHAIN } + + // ----------------- + // Prometheus Metrics + // ----------------- + pub fn get_monitored_accounts_count(&self, port: u16) -> Result { + let ephem_url = self.try_ephem_client()?.url(); + let parsed_url = Url::parse(&ephem_url).map_err(|e| { + anyhow::anyhow!( + "Failed to parse ephemeral URL '{}': {}", + ephem_url, + e + ) + })?; + let host = parsed_url.host_str().ok_or_else(|| { + anyhow::anyhow!("No host found in ephemeral URL: {}", ephem_url) + })?; + let metrics_url = format!("http://{host}:{port}/metrics"); + let response = ureq::get(&metrics_url) + .call() + .map_err(|e| { + anyhow::anyhow!( + "Failed to fetch metrics from {}: {}", + metrics_url, + e + ) + })? + .into_string() + .map_err(|e| { + anyhow::anyhow!("Failed to read metrics response: {}", e) + })?; + + for line in response.lines() { + if line.starts_with("mbv_monitored_accounts ") { + let value_str = + line.split_whitespace().nth(1).ok_or_else(|| { + anyhow::anyhow!( + "Failed to parse monitored_accounts metric" + ) + })?; + return value_str.parse::().map_err(|e| { + anyhow::anyhow!( + "Failed to parse monitored_accounts value '{}': {}", + value_str, + e + ) + }); + } + } + + Err(anyhow::anyhow!( + "monitored_accounts metric not found in Prometheus response" + )) + } }