11use std:: {
2+ cell:: RefCell ,
23 cmp:: { max, min} ,
3- collections:: { hash_map:: Entry , HashMap } ,
4+ collections:: { hash_map:: Entry , BinaryHeap , HashMap } ,
45 future:: Future ,
56 pin:: Pin ,
7+ rc:: Rc ,
68 sync:: { Arc , RwLock } ,
79} ;
810
9- use futures_util:: { stream:: FuturesUnordered , Stream , StreamExt } ;
11+ use futures_util:: { stream:: FuturesUnordered , FutureExt , Stream , StreamExt } ;
1012use log:: * ;
1113use magicblock_metrics:: metrics;
1214use solana_account_decoder:: { UiAccount , UiAccountEncoding , UiDataSliceConfig } ;
@@ -41,43 +43,24 @@ pub enum RemoteAccountUpdatesShardError {
4143
4244pub struct RemoteAccountUpdatesShard {
4345 shard_id : String ,
44- url : String ,
45- commitment : Option < CommitmentLevel > ,
4646 monitoring_request_receiver : Receiver < ( Pubkey , bool ) > ,
4747 first_subscribed_slots : Arc < RwLock < HashMap < Pubkey , Slot > > > ,
4848 last_known_update_slots : Arc < RwLock < HashMap < Pubkey , Slot > > > ,
49+ pool : PubsubPool ,
4950}
5051
5152impl RemoteAccountUpdatesShard {
52- pub fn new (
53+ pub async fn new (
5354 shard_id : String ,
5455 url : String ,
5556 commitment : Option < CommitmentLevel > ,
5657 monitoring_request_receiver : Receiver < ( Pubkey , bool ) > ,
5758 first_subscribed_slots : Arc < RwLock < HashMap < Pubkey , Slot > > > ,
5859 last_known_update_slots : Arc < RwLock < HashMap < Pubkey , Slot > > > ,
59- ) -> Self {
60- Self {
61- shard_id,
62- url,
63- commitment,
64- monitoring_request_receiver,
65- first_subscribed_slots,
66- last_known_update_slots,
67- }
68- }
69-
70- pub async fn start_monitoring_request_processing (
71- & mut self ,
72- cancellation_token : CancellationToken ,
73- ) -> Result < ( ) , RemoteAccountUpdatesShardError > {
74- // Create a pubsub client
75- info ! ( "Shard {}: Starting" , self . shard_id) ;
76- let ws_url = self . url . as_str ( ) ;
60+ ) -> Result < Self , RemoteAccountUpdatesShardError > {
7761 // For every account, we only want the updates, not the actual content of the accounts
7862 let config = RpcAccountInfoConfig {
79- commitment : self
80- . commitment
63+ commitment : commitment
8164 . map ( |commitment| CommitmentConfig { commitment } ) ,
8265 encoding : Some ( UiAccountEncoding :: Base64 ) ,
8366 data_slice : Some ( UiDataSliceConfig {
@@ -86,15 +69,39 @@ impl RemoteAccountUpdatesShard {
8669 } ) ,
8770 min_context_slot : None ,
8871 } ;
89- let mut pool = PubsubPool :: new ( ws_url, config) . await ?;
90- // Subscribe to the clock from the RPC (to figure out the latest slot)
91- let mut clock_stream = pool. subscribe ( clock:: ID ) . await ?;
72+ // Create a pubsub client
73+ info ! ( "Shard {}: Starting" , shard_id) ;
74+ let pool = PubsubPool :: new ( & url, config) . await ?;
75+ Ok ( Self {
76+ shard_id,
77+ monitoring_request_receiver,
78+ first_subscribed_slots,
79+ last_known_update_slots,
80+ pool,
81+ } )
82+ }
83+
84+ pub async fn start_monitoring_request_processing (
85+ mut self ,
86+ cancellation_token : CancellationToken ,
87+ ) {
9288 let mut clock_slot = 0 ;
9389 // We'll store useful maps for each of the account subscriptions
9490 let mut account_streams = StreamMap :: new ( ) ;
9591 const LOG_CLOCK_FREQ : u64 = 100 ;
9692 let mut log_clock_count = 0 ;
93+ // Subscribe to the clock from the RPC (to figure out the latest slot)
94+ let subscription = self . pool . subscribe ( clock:: ID ) . await ;
95+ let Ok ( ( mut clock_stream, unsub) ) = subscription. result else {
96+ error ! ( "failed to subscribe to clock on shard: {}" , self . shard_id) ;
97+ return ;
98+ } ;
99+ self . pool
100+ . unsubscribes
101+ . insert ( clock:: ID , ( subscription. client . subs . clone ( ) , unsub) ) ;
102+ self . pool . clients . push ( subscription. client ) ;
97103
104+ let mut requests = FuturesUnordered :: new ( ) ;
98105 // Loop forever until we stop the worker
99106 loop {
100107 tokio:: select! {
@@ -122,24 +129,36 @@ impl RemoteAccountUpdatesShard {
122129 if unsub {
123130 account_streams. remove( & pubkey) ;
124131 metrics:: set_subscriptions_count( account_streams. len( ) , & self . shard_id) ;
125- pool. unsubscribe( & pubkey) . await ;
132+ self . pool. unsubscribe( & pubkey) ;
126133 continue ;
127134 }
128- if pool. subscribed( & pubkey) {
135+ if self . pool. subscribed( & pubkey) {
129136 continue ;
130137 }
138+ // spawn the actual subscription handling to a background
139+ // task, so that the select loop is not blocked by it
140+ let sub = self . pool. subscribe( pubkey) . map( move |stream| ( stream, pubkey) ) ;
141+ requests. push( sub) ;
142+ }
143+ Some ( ( result, pubkey) ) = requests. next( ) , if !requests. is_empty( ) => {
144+ let ( stream, unsub) = match result. result {
145+ Ok ( s) => s,
146+ Err ( e) => {
147+ warn!( "shard {} failed to websocket subscribe to {pubkey}: {e}" , self . shard_id) ;
148+ continue ;
149+ }
150+ } ;
151+ self . try_to_override_first_subscribed_slot( pubkey, clock_slot) ;
152+ self . pool. unsubscribes. insert( pubkey, ( result. client. subs. clone( ) , unsub) ) ;
153+ self . pool. clients. push( result. client) ;
154+ account_streams. insert( pubkey, stream) ;
131155 debug!(
132156 "Shard {}: Account monitoring started: {:?}, clock_slot: {:?}" ,
133157 self . shard_id,
134158 pubkey,
135159 clock_slot
136160 ) ;
137- let stream = pool
138- . subscribe( pubkey)
139- . await ?;
140- account_streams. insert( pubkey, stream) ;
141161 metrics:: set_subscriptions_count( account_streams. len( ) , & self . shard_id) ;
142- self . try_to_override_first_subscribed_slot( pubkey, clock_slot) ;
143162 }
144163 // When we receive an update from any account subscriptions
145164 Some ( ( pubkey, update) ) = account_streams. next( ) => {
@@ -159,10 +178,8 @@ impl RemoteAccountUpdatesShard {
159178 // Cleanup all subscriptions and wait for proper shutdown
160179 drop ( account_streams) ;
161180 drop ( clock_stream) ;
162- pool. shutdown ( ) . await ;
181+ self . pool . shutdown ( ) . await ;
163182 info ! ( "Shard {}: Stopped" , self . shard_id) ;
164- // Done
165- Ok ( ( ) )
166183 }
167184
168185 fn try_to_override_first_subscribed_slot (
@@ -223,8 +240,8 @@ impl RemoteAccountUpdatesShard {
223240}
224241
225242struct PubsubPool {
226- clients : Vec < PubSubConnection > ,
227- unsubscribes : HashMap < Pubkey , ( usize , BoxFn ) > ,
243+ clients : BinaryHeap < PubSubConnection > ,
244+ unsubscribes : HashMap < Pubkey , ( Rc < RefCell < usize > > , BoxFn ) > ,
228245 config : RpcAccountInfoConfig ,
229246}
230247
@@ -237,7 +254,7 @@ impl PubsubPool {
237254 // of connections per RPC upstream, we don't overcomplicate things
238255 // here, as the whole cloning pipeline will be rewritten quite soon
239256 const CONNECTIONS_PER_POOL : usize = 8 ;
240- let mut clients = Vec :: with_capacity ( CONNECTIONS_PER_POOL ) ;
257+ let mut clients = BinaryHeap :: with_capacity ( CONNECTIONS_PER_POOL ) ;
241258 let mut connections: FuturesUnordered < _ > = ( 0 ..CONNECTIONS_PER_POOL )
242259 . map ( |_| PubSubConnection :: new ( url) )
243260 . collect ( ) ;
@@ -251,68 +268,105 @@ impl PubsubPool {
251268 } )
252269 }
253270
254- async fn subscribe (
271+ fn subscribe (
255272 & mut self ,
256273 pubkey : Pubkey ,
257- ) -> Result < SubscriptionStream , RemoteAccountUpdatesShardError > {
258- let ( index, client) = self
259- . clients
260- . iter_mut ( )
261- . enumerate ( )
262- . min_by ( |a, b| a. 1 . subs . cmp ( & b. 1 . subs ) )
263- . expect ( "clients vec is always greater than 0" ) ;
264- let ( stream, unsubscribe) = client
265- . inner
266- . account_subscribe ( & pubkey, Some ( self . config . clone ( ) ) )
267- . await
268- . map_err ( RemoteAccountUpdatesShardError :: PubsubClientError ) ?;
269- client. subs += 1 ;
270- // SAFETY:
271- // we never drop the PubsubPool before the returned subscription stream
272- // so the lifetime of the stream can be safely extended to 'static
273- #[ allow( clippy:: missing_transmute_annotations) ]
274- let stream = unsafe { std:: mem:: transmute ( stream) } ;
275- self . unsubscribes . insert ( pubkey, ( index, unsubscribe) ) ;
276- Ok ( stream)
274+ ) -> impl Future < Output = SubscriptionResult > {
275+ let client = self . clients . pop ( ) . expect (
276+ "websocket connection pool always has at least one connection" ,
277+ ) ;
278+ let config = Some ( self . config . clone ( ) ) ;
279+ async move {
280+ let result = client
281+ . inner
282+ . account_subscribe ( & pubkey, config)
283+ . await
284+ . map_err ( RemoteAccountUpdatesShardError :: PubsubClientError )
285+ . map ( |( stream, unsub) | {
286+ // SAFETY:
287+ // we never drop the PubsubPool before the returned subscription stream
288+ // so the lifetime of the stream can be safely extended to 'static
289+ #[ allow( clippy:: missing_transmute_annotations) ]
290+ let stream = unsafe { std:: mem:: transmute ( stream) } ;
291+ ( stream, unsub)
292+ } ) ;
293+ * client. subs . borrow_mut ( ) += 1 ;
294+ SubscriptionResult { result, client }
295+ }
277296 }
278297
279- async fn unsubscribe ( & mut self , pubkey : & Pubkey ) {
280- let Some ( ( index , callback) ) = self . unsubscribes . remove ( pubkey) else {
298+ fn unsubscribe ( & mut self , pubkey : & Pubkey ) {
299+ let Some ( ( subs , callback) ) = self . unsubscribes . remove ( pubkey) else {
281300 return ;
282301 } ;
283- callback ( ) . await ;
284- let Some ( client) = self . clients . get_mut ( index) else {
285- return ;
286- } ;
287- client. subs = client. subs . saturating_sub ( 1 ) ;
302+ let count = * subs. borrow ( ) ;
303+ * subs. borrow_mut ( ) = count. saturating_sub ( 1 ) ;
304+ drop ( subs) ;
305+ tokio:: spawn ( callback ( ) ) ;
288306 }
289307
290- fn subscribed ( & mut self , pubkey : & Pubkey ) -> bool {
308+ fn subscribed ( & self , pubkey : & Pubkey ) -> bool {
291309 self . unsubscribes . contains_key ( pubkey)
292310 }
293311
294312 async fn shutdown ( & mut self ) {
295313 // Cleanup all subscriptions and wait for proper shutdown
296314 for ( pubkey, ( _, callback) ) in self . unsubscribes . drain ( ) {
297- info ! ( "Account monitoring killed: {:?}" , pubkey) ;
298- callback ( ) . await ;
315+ debug ! ( "Account monitoring killed: {:?}" , pubkey) ;
316+ tokio :: spawn ( callback ( ) ) ;
299317 }
300- for client in self . clients . drain ( .. ) {
318+ for client in self . clients . drain ( ) {
301319 let _ = client. inner . shutdown ( ) . await ;
302320 }
303321 }
304322}
305323
306324struct PubSubConnection {
307325 inner : PubsubClient ,
308- subs : usize ,
326+ subs : Rc < RefCell < usize > > ,
327+ }
328+
329+ impl PartialEq for PubSubConnection {
330+ fn eq ( & self , other : & Self ) -> bool {
331+ self . subs . eq ( & other. subs )
332+ }
333+ }
334+
335+ impl PartialOrd for PubSubConnection {
336+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
337+ // NOTE: intentional reverse ordering for the use in the BinaryHeap
338+ Some ( other. subs . cmp ( & self . subs ) )
339+ }
340+ }
341+
342+ impl Eq for PubSubConnection { }
343+
344+ impl Ord for PubSubConnection {
345+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
346+ // NOTE: intentional reverse ordering for the use in the BinaryHeap
347+ other. subs . cmp ( & self . subs )
348+ }
309349}
310350
311351impl PubSubConnection {
312352 async fn new ( url : & str ) -> Result < Self , RemoteAccountUpdatesShardError > {
313353 let inner = PubsubClient :: new ( url)
314354 . await
315355 . map_err ( RemoteAccountUpdatesShardError :: PubsubClientError ) ?;
316- Ok ( Self { inner, subs : 0 } )
356+ Ok ( Self {
357+ inner,
358+ subs : Default :: default ( ) ,
359+ } )
317360 }
318361}
362+
363+ // SAFETY: the Rc<RefCell> used in the connection never escape outside of the Shard,
364+ // and the borrows are never held across the await points, thus these impls are safe
365+ unsafe impl Send for PubSubConnection { }
366+ unsafe impl Send for PubsubPool { }
367+ unsafe impl Send for RemoteAccountUpdatesShard { }
368+
369+ struct SubscriptionResult {
370+ result : Result < ( SubscriptionStream , BoxFn ) , RemoteAccountUpdatesShardError > ,
371+ client : PubSubConnection ,
372+ }
0 commit comments