Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions magicblock-api/src/fund_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::path::Path;
use magicblock_accounts_db::AccountsDb;
use magicblock_core::traits::AccountsBank;
use magicblock_magic_program_api as magic_program;
use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY;
use magicblock_program::{MagicContext, TaskContext};
use magicblock_program::MagicContext;
use solana_sdk::{
account::{AccountSharedData, WritableAccount},
pubkey::Pubkey,
Expand Down Expand Up @@ -86,18 +85,3 @@ pub(crate) fn fund_magic_context(accountsdb: &AccountsDb) {
accountsdb
.insert_account(&magic_program::MAGIC_CONTEXT_PUBKEY, &magic_context);
}

pub(crate) fn fund_task_context(accountsdb: &AccountsDb) {
fund_account_with_data(
accountsdb,
&TASK_CONTEXT_PUBKEY,
u64::MAX,
TaskContext::SIZE,
);
let mut task_context = accountsdb
.get_account(&magic_program::TASK_CONTEXT_PUBKEY)
.unwrap();
task_context.set_delegated(true);
accountsdb
.insert_account(&magic_program::TASK_CONTEXT_PUBKEY, &task_context);
}
65 changes: 40 additions & 25 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ use crate::{
remote_cluster_from_remote, try_convert_accounts_config,
},
fund_account::{
fund_magic_context, fund_task_context, funded_faucet,
init_validator_identity,
fund_magic_context, funded_faucet, init_validator_identity,
},
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
ledger::{
Expand Down Expand Up @@ -139,7 +138,7 @@ pub struct MagicValidator {
block_udpate_tx: BlockUpdateTx,
_metrics: Option<(MetricsService, tokio::task::JoinHandle<()>)>,
claim_fees_task: ClaimFeesTask,
task_scheduler_handle: Option<tokio::task::JoinHandle<()>>,
task_scheduler: Option<TaskSchedulerService>,
}

impl MagicValidator {
Expand Down Expand Up @@ -203,7 +202,6 @@ impl MagicValidator {

init_validator_identity(&accountsdb, &validator_pubkey);
fund_magic_context(&accountsdb);
fund_task_context(&accountsdb);

let faucet_keypair =
funded_faucet(&accountsdb, ledger.ledger_path().as_path())?;
Expand Down Expand Up @@ -235,7 +233,7 @@ impl MagicValidator {

let accounts_config = try_get_remote_accounts_config(&config.accounts)?;

let (dispatch, validator_channels) = link();
let (mut dispatch, validator_channels) = link();

let committor_persist_path =
storage_path.join("committor_service.sqlite");
Expand Down Expand Up @@ -284,6 +282,7 @@ impl MagicValidator {
txn_to_process_rx: validator_channels.transaction_to_process,
account_update_tx: validator_channels.account_update,
environment: build_svm_env(&accountsdb, latest_block.blockhash, 0),
tasks_tx: validator_channels.tasks_service,
};
txn_scheduler_state
.load_upgradeable_programs(&programs_to_load(&config.programs))
Expand Down Expand Up @@ -322,6 +321,26 @@ impl MagicValidator {
.await?;
let rpc_handle = tokio::spawn(rpc.run());

let task_scheduler_db_path =
SchedulerDatabase::path(ledger.ledger_path().parent().expect(
"ledger_path didn't have a parent, should never happen",
));
debug!(
"Task scheduler persists to: {}",
task_scheduler_db_path.display()
);
let task_scheduler = TaskSchedulerService::new(
&task_scheduler_db_path,
&config.task_scheduler,
dispatch.transaction_scheduler.clone(),
dispatch
.tasks_service
.take()
.expect("tasks_service should be initialized"),
ledger.latest_block().clone(),
token.clone(),
)?;

Ok(Self {
accountsdb,
config,
Expand All @@ -340,7 +359,7 @@ impl MagicValidator {
identity: validator_pubkey,
transaction_scheduler: dispatch.transaction_scheduler,
block_udpate_tx: validator_channels.block_update,
task_scheduler_handle: None,
task_scheduler: Some(task_scheduler),
})
}

Expand Down Expand Up @@ -653,30 +672,26 @@ impl MagicValidator {

self.ledger_truncator.start();

let task_scheduler_db_path =
SchedulerDatabase::path(self.ledger.ledger_path().parent().expect(
"ledger_path didn't have a parent, should never happen",
));
debug!(
"Task scheduler persists to: {}",
task_scheduler_db_path.display()
);
let task_scheduler_handle = TaskSchedulerService::start(
&task_scheduler_db_path,
&self.config.task_scheduler,
self.accountsdb.clone(),
self.transaction_scheduler.clone(),
self.ledger.latest_block().clone(),
self.token.clone(),
)?;
// TODO: we should shutdown gracefully.
// This is discussed in this comment:
// https://github.com/magicblock-labs/magicblock-validator/pull/493#discussion_r2324560798
// However there is no proper solution for this right now.
// An issue to create a shutdown system is open here:
// https://github.com/magicblock-labs/magicblock-validator/issues/524
self.task_scheduler_handle = Some(tokio::spawn(async move {
match task_scheduler_handle.await {
let task_scheduler = self
.task_scheduler
.take()
.expect("task_scheduler should be initialized");
tokio::spawn(async move {
let join_handle = match task_scheduler.start() {
Ok(join_handle) => join_handle,
Err(err) => {
error!("Failed to start task scheduler: {:?}", err);
error!("Exiting process...");
std::process::exit(1);
}
};
match join_handle.await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
error!("An error occurred while running the task scheduler: {:?}", err);
Expand All @@ -689,7 +704,7 @@ impl MagicValidator {
std::process::exit(1);
}
}
}));
});

validator::finished_starting_up();
Ok(())
Expand Down
1 change: 0 additions & 1 deletion magicblock-chainlink/src/chainlink/blacklisted_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub fn blacklisted_accounts(

blacklisted_accounts.insert(magic_program::ID);
blacklisted_accounts.insert(magic_program::MAGIC_CONTEXT_PUBKEY);
blacklisted_accounts.insert(magic_program::TASK_CONTEXT_PUBKEY);
blacklisted_accounts.insert(*validator_id);
blacklisted_accounts.insert(*faucet_id);
blacklisted_accounts
Expand Down
2 changes: 2 additions & 0 deletions magicblock-committor-program/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ doctest = false
[features]
no-entrypoint = []
default = []
custom-heap = []
custom-panic = []
20 changes: 4 additions & 16 deletions magicblock-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,7 @@ mod tests {
port: 9090,
},
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 1000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
};
let original_config = config.clone();
let other = EphemeralConfig::default();
Expand Down Expand Up @@ -356,10 +353,7 @@ mod tests {
port: 9090,
},
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 1000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
};

config.merge(other.clone());
Expand Down Expand Up @@ -441,10 +435,7 @@ mod tests {
port: 9090,
},
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 2000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
};
let original_config = config.clone();
let other = EphemeralConfig {
Expand Down Expand Up @@ -519,10 +510,7 @@ mod tests {
port: 9090,
},
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 1000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
};

config.merge(other);
Expand Down
27 changes: 9 additions & 18 deletions magicblock-config/src/task_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,20 @@ use serde::{Deserialize, Serialize};
#[clap_prefix("task-scheduler")]
#[clap_from_serde]
#[derive(
Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Args, Mergeable,
Debug,
Default,
Clone,
Serialize,
Deserialize,
PartialEq,
Eq,
Args,
Mergeable,
)]
#[serde(deny_unknown_fields, rename_all = "kebab-case")]
pub struct TaskSchedulerConfig {
/// If true, the task scheduler will reset the database on startup.
#[derive_env_var]
#[serde(default)]
pub reset: bool,
/// Determines how frequently the task scheduler will check for executable tasks.
#[derive_env_var]
#[serde(default = "default_millis_per_tick")]
pub millis_per_tick: u64,
}

impl Default for TaskSchedulerConfig {
fn default() -> Self {
Self {
reset: bool::default(),
millis_per_tick: default_millis_per_tick(),
}
}
}

fn default_millis_per_tick() -> u64 {
200
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,3 @@ system-metrics-tick-interval-secs = 10

[task-scheduler]
reset = true
millis-per-tick = 1000
5 changes: 1 addition & 4 deletions magicblock-config/tests/parse_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,7 @@ fn test_everything_defined() {
addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
},
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 1000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
}
);
}
Expand Down
6 changes: 1 addition & 5 deletions magicblock-config/tests/read_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ fn test_load_local_dev_with_programs_toml_envs_override() {
env::set_var("METRICS_SYSTEM_METRICS_TICK_INTERVAL_SECS", "10");
env::set_var("CLONE_AUTO_AIRDROP_LAMPORTS", "123");
env::set_var("TASK_SCHEDULER_RESET", "true");
env::set_var("TASK_SCHEDULER_MILLIS_PER_TICK", "1000");

let config = parse_config_with_file(&config_file_dir);

Expand Down Expand Up @@ -202,10 +201,7 @@ fn test_load_local_dev_with_programs_toml_envs_override() {
},
system_metrics_tick_interval_secs: 10,
},
task_scheduler: TaskSchedulerConfig {
reset: true,
millis_per_tick: 1000,
},
task_scheduler: TaskSchedulerConfig { reset: true },
}
);

Expand Down
11 changes: 9 additions & 2 deletions magicblock-core/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use accounts::{AccountUpdateRx, AccountUpdateTx};
use blocks::{BlockUpdateRx, BlockUpdateTx};
use tokio::sync::mpsc;
use transactions::{
TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx,
TransactionToProcessRx,
ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle,
TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx,
};

pub mod accounts;
Expand All @@ -27,6 +27,8 @@ pub struct DispatchEndpoints {
pub account_update: AccountUpdateRx,
/// Receives notifications when a new block is produced.
pub block_update: BlockUpdateRx,
/// Receives scheduled (crank) tasks from transactions executor.
pub tasks_service: Option<ScheduledTasksRx>,
}

/// A collection of channel endpoints for the **validator's internal core**.
Expand All @@ -43,6 +45,8 @@ pub struct ValidatorChannelEndpoints {
pub account_update: AccountUpdateTx,
/// Sends notifications when a new block is produced to the pool of EventProcessor workers.
pub block_update: BlockUpdateTx,
/// Sends scheduled (crank) tasks to tasks service from transactions executor.
pub tasks_service: ScheduledTasksTx,
}

/// Creates and connects the full set of communication channels between the dispatch
Expand All @@ -58,6 +62,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
let (transaction_status_tx, transaction_status_rx) = flume::unbounded();
let (account_update_tx, account_update_rx) = flume::unbounded();
let (block_update_tx, block_update_rx) = flume::unbounded();
let (tasks_tx, tasks_rx) = mpsc::unbounded_channel();

// Bounded channels for command queues where applying backpressure is important.
let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY);
Expand All @@ -68,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
transaction_status: transaction_status_rx,
account_update: account_update_rx,
block_update: block_update_rx,
tasks_service: Some(tasks_rx),
};

// Bundle the corresponding channel ends for the validator's internal core.
Expand All @@ -76,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
transaction_status: transaction_status_tx,
account_update: account_update_tx,
block_update: block_update_tx,
tasks_service: tasks_tx,
};

(dispatch, validator)
Expand Down
Loading
Loading