Skip to content

Commit 7f30e8e

Browse files
PubSub redesign from broadcast to multicast (#373)
This PR reworks the way events are delivered to websocket subscription handlers. The main changes: 1. Every subscriber registers itself in the global subscriptions database, along with event that it's interested in 2. Geyser subsystem, upon receiving a particular event checks whether there're active subscribers for the event, and sends it only to those, no filtering is required further. 3. Rust-Protobuf-Rust conversions are disabled to avoid unnecessary overhead 4. gRPC plugin is completely disabled to avoid unnecessary overhead <!-- greptile_comment --> ## Greptile Summary This PR implements a significant architectural change to the PubSub system, transitioning from a broadcast model to a multicast model for more efficient event delivery. The changes focus on targeted message delivery through a global subscriptions database. - Introduces `SubscriptionsDb` in `magicblock-geyser-plugin/src/types.rs` for centralized subscription management using concurrent hashmaps - Removes gRPC and protobuf conversion overhead by removing `conversions.rs` and simplifying message handling - Adds new `notification_builder.rs` with trait-based notification system for different event types - Reduces default channel capacity from 250,000 to 1,024 in `config.rs`, which may impact high-throughput scenarios - Adds comprehensive test suite in `test-integration/test-pubsub/` validating subscription functionality for accounts, logs, signatures, slots and programs <!-- /greptile_comment --> --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent fc447cc commit 7f30e8e

38 files changed

+1416
-2257
lines changed

Cargo.lock

Lines changed: 42 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

magicblock-accounts-db/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,8 @@ impl AccountsDb {
265265

266266
let used_storage = self.storage.utilized_mmap();
267267
if let Err(err) = self.snapshot_engine.snapshot(slot, used_storage) {
268-
error!(
269-
"error taking snapshot at {}, slot {slot}: {err}",
268+
warn!(
269+
"failed to take snapshot at {}, slot {slot}: {err}",
270270
self.snapshot_engine.database_path().display()
271271
);
272272
}

magicblock-api/src/magic_validator.rs

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ pub struct MagicValidator {
134134
sample_performance_service: Option<SamplePerformanceService>,
135135
commit_accounts_ticker: Option<tokio::task::JoinHandle<()>>,
136136
remote_account_fetcher_worker: Option<RemoteAccountFetcherWorker>,
137-
remote_account_fetcher_handle: Option<thread::JoinHandle<()>>,
137+
remote_account_fetcher_handle: Option<tokio::task::JoinHandle<()>>,
138138
remote_account_updates_worker: Option<RemoteAccountUpdatesWorker>,
139-
remote_account_updates_handle: Option<thread::JoinHandle<()>>,
139+
remote_account_updates_handle: Option<tokio::task::JoinHandle<()>>,
140140
remote_account_cloner_worker: Option<
141141
RemoteAccountClonerWorker<
142142
BankAccountProvider,
@@ -145,7 +145,7 @@ pub struct MagicValidator {
145145
AccountDumperBank,
146146
>,
147147
>,
148-
remote_account_cloner_handle: Option<thread::JoinHandle<()>>,
148+
remote_account_cloner_handle: Option<tokio::task::JoinHandle<()>>,
149149
accounts_manager: Arc<AccountsManager>,
150150
transaction_listener: GeyserTransactionNotifyListener,
151151
rpc_service: JsonRpcService,
@@ -692,15 +692,10 @@ impl MagicValidator {
692692
{
693693
let cancellation_token = self.token.clone();
694694
self.remote_account_fetcher_handle =
695-
Some(thread::spawn(move || {
696-
create_worker_runtime("remote_account_fetcher_worker")
697-
.block_on(async move {
698-
remote_account_fetcher_worker
699-
.start_fetch_request_processing(
700-
cancellation_token,
701-
)
702-
.await;
703-
});
695+
Some(tokio::spawn(async move {
696+
remote_account_fetcher_worker
697+
.start_fetch_request_processing(cancellation_token)
698+
.await;
704699
}));
705700
}
706701
}
@@ -711,15 +706,10 @@ impl MagicValidator {
711706
{
712707
let cancellation_token = self.token.clone();
713708
self.remote_account_updates_handle =
714-
Some(thread::spawn(move || {
715-
create_worker_runtime("remote_account_updates_worker")
716-
.block_on(async move {
717-
remote_account_updates_worker
718-
.start_monitoring_request_processing(
719-
cancellation_token,
720-
)
721-
.await
722-
});
709+
Some(tokio::spawn(async move {
710+
remote_account_updates_worker
711+
.start_monitoring_request_processing(cancellation_token)
712+
.await
723713
}));
724714
}
725715
}
@@ -735,15 +725,10 @@ impl MagicValidator {
735725

736726
let cancellation_token = self.token.clone();
737727
self.remote_account_cloner_handle =
738-
Some(thread::spawn(move || {
739-
create_worker_runtime("remote_account_cloner_worker")
740-
.block_on(async move {
741-
remote_account_cloner_worker
742-
.start_clone_request_processing(
743-
cancellation_token,
744-
)
745-
.await
746-
});
728+
Some(tokio::spawn(async move {
729+
remote_account_cloner_worker
730+
.start_clone_request_processing(cancellation_token)
731+
.await
747732
}));
748733
}
749734
Ok(())
@@ -801,11 +786,3 @@ fn programs_to_load(programs: &[ProgramConfig]) -> Vec<(Pubkey, String)> {
801786
.map(|program| (program.id, program.path.clone()))
802787
.collect()
803788
}
804-
805-
fn create_worker_runtime(thread_name: &str) -> tokio::runtime::Runtime {
806-
tokio::runtime::Builder::new_current_thread()
807-
.enable_all()
808-
.thread_name(thread_name)
809-
.build()
810-
.unwrap()
811-
}

magicblock-geyser-plugin/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ bs58 = { workspace = true }
1414
expiring-hashmap = { workspace = true }
1515
geyser-grpc-proto = { workspace = true }
1616
hostname = { workspace = true }
17+
flume = "0.11"
1718
log = { workspace = true }
1819
serde = { workspace = true }
1920
serde_json = { workspace = true }
2021
magicblock-transaction-status = { workspace = true }
22+
scc = "2.3"
2123
solana-geyser-plugin-interface = { workspace = true }
2224
solana-sdk = { workspace = true }
2325
spl-token-2022 = { workspace = true, features = ["no-entrypoint"] }
@@ -27,6 +29,7 @@ tokio-util = { workspace = true }
2729
tonic = { workspace = true, features = ["gzip", "tls", "tls-roots"] }
2830
tonic-health = { workspace = true }
2931

32+
3033
[build-dependencies]
3134
anyhow = { workspace = true }
3235
cargo-lock = { workspace = true }

magicblock-geyser-plugin/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct ConfigGrpc {
7070
}
7171

7272
const MAX_DECODING_MESSAGE_SIZE_DEFAULT: usize = 4 * 1024 * 1024;
73-
const CHANNEL_CAPACITY_DEFAULT: usize = 250_000;
73+
const CHANNEL_CAPACITY_DEFAULT: usize = 1024;
7474
const UNARY_CONCURRENCY_LIMIT_DEFAULT: usize = Semaphore::MAX_PERMITS;
7575

7676
impl Default for ConfigGrpc {

0 commit comments

Comments
 (0)