Skip to content

Commit 3a8334f

Browse files
thlorenzDodecahedr0x
authored andcommitted
feat: track program subscription account updates (#767)
1 parent a0a8829 commit 3a8334f

File tree

4 files changed

+60
-51
lines changed

4 files changed

+60
-51
lines changed

.github/CODEOWNERS

Lines changed: 0 additions & 28 deletions
This file was deleted.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99

1010
use futures_util::stream::FuturesUnordered;
1111
use log::*;
12+
use magicblock_metrics::metrics::inc_program_subscription_account_updates;
1213
use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding};
1314
use solana_commitment_config::CommitmentConfig;
1415
use solana_pubkey::Pubkey;
@@ -522,7 +523,7 @@ impl ChainPubsubActor {
522523
}
523524
#[allow(clippy::too_many_arguments)]
524525
async fn add_program_sub(
525-
pubkey: Pubkey,
526+
program_pubkey: Pubkey,
526527
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
527528
subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
528529
program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>,
@@ -536,14 +537,14 @@ impl ChainPubsubActor {
536537
if program_subs
537538
.lock()
538539
.expect("program subscriptions lock poisoned")
539-
.contains_key(&pubkey)
540+
.contains_key(&program_pubkey)
540541
{
541-
trace!("[client_id={client_id}] Program subscription for {pubkey} already exists, ignoring add_program_sub request");
542+
trace!("[client_id={client_id}] Program subscription for {program_pubkey} already exists, ignoring add_program_sub request");
542543
let _ = sub_response.send(Ok(()));
543544
return;
544545
}
545546

546-
trace!("[client_id={client_id}] Adding program subscription for {pubkey} with commitment {commitment_config:?}");
547+
trace!("[client_id={client_id}] Adding program subscription for {program_pubkey} with commitment {commitment_config:?}");
547548

548549
let cancellation_token = CancellationToken::new();
549550

@@ -552,7 +553,7 @@ impl ChainPubsubActor {
552553
.lock()
553554
.expect("program subscriptions lock poisoned");
554555
program_subs_lock.insert(
555-
pubkey,
556+
program_pubkey,
556557
AccountSubscription {
557558
cancellation_token: cancellation_token.clone(),
558559
},
@@ -569,12 +570,12 @@ impl ChainPubsubActor {
569570
};
570571

571572
let (mut update_stream, unsubscribe) = match pubsub_connection
572-
.program_subscribe(&pubkey, config.clone())
573+
.program_subscribe(&program_pubkey, config.clone())
573574
.await
574575
{
575576
Ok(res) => res,
576577
Err(err) => {
577-
error!("[client_id={client_id}] Failed to subscribe to program {pubkey} {err:?}");
578+
error!("[client_id={client_id}] Failed to subscribe to program {program_pubkey} {err:?}");
578579
Self::abort_and_signal_connection_issue(
579580
client_id,
580581
subs.clone(),
@@ -597,30 +598,37 @@ impl ChainPubsubActor {
597598
loop {
598599
tokio::select! {
599600
_ = cancellation_token.cancelled() => {
600-
trace!("[client_id={client_id}] Subscription for program {pubkey} was cancelled");
601+
trace!("[client_id={client_id}] Subscription for program {program_pubkey} was cancelled");
601602
break;
602603
}
603604
update = update_stream.next() => {
604605
if let Some(rpc_response) = update {
605-
let pubkey = rpc_response.value.pubkey
606+
let acc_pubkey = rpc_response.value.pubkey
606607
.parse::<Pubkey>().inspect_err(|err| {
607608
warn!("[client_id={client_id}] Received invalid pubkey in program subscription update: {} {:?}", rpc_response.value.pubkey, err);
608609
});
609-
if let Ok(pubkey) = pubkey {
610-
if subs.lock().expect("subscriptions lock poisoned").contains_key(&pubkey) {
611-
let _ = subscription_updates_sender.send(SubscriptionUpdate {
612-
pubkey,
613-
rpc_response: RpcResponse {
614-
context: rpc_response.context,
615-
value: rpc_response.value.account,
616-
},
617-
}).await.inspect_err(|err| {
618-
error!("[client_id={client_id}] Failed to send {pubkey} subscription update: {err:?}");
619-
});
610+
if let Ok(acc_pubkey) = acc_pubkey {
611+
if subs.lock().expect("subscriptions lock poisoned").contains_key(&acc_pubkey) {
612+
let sub_update = SubscriptionUpdate {
613+
pubkey: acc_pubkey,
614+
rpc_response: RpcResponse {
615+
context: rpc_response.context,
616+
value: rpc_response.value.account,
617+
},
618+
};
619+
trace!("[client_id={client_id}] Sending program {program_pubkey} account update: {sub_update:?}");
620+
inc_program_subscription_account_updates(
621+
&client_id.to_string(),
622+
);
623+
let _ = subscription_updates_sender.send(sub_update)
624+
.await
625+
.inspect_err(|err| {
626+
error!("[client_id={client_id}] Failed to send {acc_pubkey} subscription update: {err:?}");
627+
});
620628
}
621629
}
622630
} else {
623-
debug!("[client_id={client_id}] Subscription for program {pubkey} ended (EOF); signaling connection issue");
631+
debug!("[client_id={client_id}] Subscription for program {program_pubkey} ended (EOF); signaling connection issue");
624632
Self::abort_and_signal_connection_issue(
625633
client_id,
626634
subs.clone(),
@@ -643,13 +651,13 @@ impl ChainPubsubActor {
643651
.is_err()
644652
{
645653
warn!(
646-
"[client_id={client_id}] unsubscribe timed out for program {pubkey}"
654+
"[client_id={client_id}] unsubscribe timed out for program {program_pubkey}"
647655
);
648656
}
649657
program_subs
650658
.lock()
651659
.expect("program_subs lock poisoned")
652-
.remove(&pubkey);
660+
.remove(&program_pubkey);
653661
});
654662
}
655663

magicblock-metrics/src/metrics/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,16 @@ lazy_static::lazy_static! {
162162
"evicted_accounts_count", "Total cumulative number of accounts forcefully removed from monitored list and database (monotonically increasing)",
163163
).unwrap();
164164

165+
static ref PROGRAM_SUBSCRIPTION_ACCOUNT_UPDATES: IntCounterVec =
166+
IntCounterVec::new(
167+
Opts::new(
168+
"program_subscription_account_updates",
169+
"Number of account updates received via program subscription",
170+
),
171+
&["client_id"],
172+
)
173+
.unwrap();
174+
165175
// -----------------
166176
// RPC/Aperture
167177
// -----------------
@@ -410,6 +420,7 @@ pub(crate) fn register() {
410420
register!(PENDING_ACCOUNT_CLONES_GAUGE);
411421
register!(MONITORED_ACCOUNTS_GAUGE);
412422
register!(EVICTED_ACCOUNTS_COUNT);
423+
register!(PROGRAM_SUBSCRIPTION_ACCOUNT_UPDATES);
413424
register!(COMMITTOR_INTENTS_COUNT);
414425
register!(COMMITTOR_INTENTS_BACKLOG_COUNT);
415426
register!(COMMITTOR_FAILED_INTENTS_COUNT);
@@ -627,6 +638,12 @@ pub fn inc_account_fetches_not_found(
627638
.inc_by(count);
628639
}
629640

641+
pub fn inc_program_subscription_account_updates(client_id: &impl LabelValue) {
642+
PROGRAM_SUBSCRIPTION_ACCOUNT_UPDATES
643+
.with_label_values(&[client_id.value()])
644+
.inc();
645+
}
646+
630647
pub fn inc_per_program_account_fetch_stats(
631648
program_id: &str,
632649
result: ProgramFetchResult,

magicblock-metrics/src/metrics/types.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,18 @@ pub trait LabelValue {
103103
fn value(&self) -> &str;
104104
}
105105

106+
impl LabelValue for &str {
107+
fn value(&self) -> &str {
108+
self
109+
}
110+
}
111+
112+
impl LabelValue for String {
113+
fn value(&self) -> &str {
114+
self
115+
}
116+
}
117+
106118
impl<T, E> LabelValue for Result<T, E>
107119
where
108120
T: LabelValue,

0 commit comments

Comments
 (0)