Skip to content

Commit b2882c3

Browse files
bmuddhathlorenz
andauthored
fix: prevent new subscriptions if no free connections exist (#456)
this fix introduces a change to the subscription handling logic in such a way that the subscription processing stops briefly, if all of the existing connections are busy by inflight subscription request, thus avoiding an attempt to send a subscription when no connection is available to handle it --------- Co-authored-by: Thorsten Lorenz <thlorenz@gmx.de>
1 parent 7fa860e commit b2882c3

File tree

1 file changed

+21
-6
lines changed

1 file changed

+21
-6
lines changed

magicblock-account-updates/src/remote_account_updates_shard.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
pin::Pin,
77
rc::Rc,
88
sync::{Arc, RwLock},
9+
time::Duration,
910
};
1011

1112
use futures_util::{stream::FuturesUnordered, FutureExt, Stream, StreamExt};
@@ -39,6 +40,8 @@ pub enum RemoteAccountUpdatesShardError {
3940
#[from]
4041
solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
4142
),
43+
#[error("failed to subscribe to remote account updates")]
44+
SubscriptionTimeout,
4245
}
4346

4447
pub struct RemoteAccountUpdatesShard {
@@ -125,7 +128,7 @@ impl RemoteAccountUpdatesShard {
125128
self.try_to_override_last_known_update_slot(clock::ID, clock_slot);
126129
}
127130
// When we receive a message to start monitoring an account
128-
Some((pubkey, unsub)) = self.monitoring_request_receiver.recv() => {
131+
Some((pubkey, unsub)) = self.monitoring_request_receiver.recv(), if !self.pool.is_empty() => {
129132
if unsub {
130133
account_streams.remove(&pubkey);
131134
metrics::set_subscriptions_count(account_streams.len(), &self.shard_id);
@@ -145,6 +148,7 @@ impl RemoteAccountUpdatesShard {
145148
Ok(s) => s,
146149
Err(e) => {
147150
warn!("shard {} failed to websocket subscribe to {pubkey}: {e}", self.shard_id);
151+
self.pool.clients.push(result.client);
148152
continue;
149153
}
150154
};
@@ -275,22 +279,28 @@ impl PubsubPool {
275279
let client = self.clients.pop().expect(
276280
"websocket connection pool always has at least one connection",
277281
);
282+
const SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
278283
let config = Some(self.config.clone());
279284
async move {
280-
let result = client
281-
.inner
282-
.account_subscribe(&pubkey, config)
283-
.await
285+
let request = client.inner.account_subscribe(&pubkey, config);
286+
let request_with_timeout =
287+
tokio::time::timeout(SUBSCRIPTION_TIMEOUT, request);
288+
let Ok(result) = request_with_timeout.await else {
289+
let result =
290+
Err(RemoteAccountUpdatesShardError::SubscriptionTimeout);
291+
return SubscriptionResult { result, client };
292+
};
293+
let result = result
284294
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)
285295
.map(|(stream, unsub)| {
286296
// SAFETY:
287297
// we never drop the PubsubPool before the returned subscription stream
288298
// so the lifetime of the stream can be safely extended to 'static
289299
#[allow(clippy::missing_transmute_annotations)]
290300
let stream = unsafe { std::mem::transmute(stream) };
301+
*client.subs.borrow_mut() += 1;
291302
(stream, unsub)
292303
});
293-
*client.subs.borrow_mut() += 1;
294304
SubscriptionResult { result, client }
295305
}
296306
}
@@ -319,6 +329,11 @@ impl PubsubPool {
319329
let _ = client.inner.shutdown().await;
320330
}
321331
}
332+
333+
#[inline]
334+
fn is_empty(&self) -> bool {
335+
self.clients.is_empty()
336+
}
322337
}
323338

324339
struct PubSubConnection {

0 commit comments

Comments
 (0)