diff --git a/Cargo.lock b/Cargo.lock index 5ff4a459f..036ffcc31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,6 +2248,7 @@ name = "guinea" version = "0.2.3" dependencies = [ "bincode", + "magicblock-magic-program-api", "serde", "solana-program", ] @@ -3861,6 +3862,7 @@ dependencies = [ "magicblock-accounts-db", "magicblock-core", "magicblock-ledger", + "magicblock-magic-program-api", "magicblock-metrics", "magicblock-program", "parking_lot 0.12.4", @@ -3949,19 +3951,25 @@ dependencies = [ "bincode", "chrono", "futures-util", + "guinea", "log", "magicblock-config", "magicblock-core", "magicblock-ledger", + "magicblock-magic-program-api", "magicblock-processor", "magicblock-program", "rusqlite", "serde", + "solana-account", "solana-program", + "solana-pubkey", "solana-pubsub-client", "solana-sdk", + "solana-signature", "solana-svm", "solana-timings", + "test-kit", "thiserror 1.0.69", "tokio", "tokio-util 0.7.15", diff --git a/magicblock-api/src/fund_account.rs b/magicblock-api/src/fund_account.rs index e4cec42b7..9bdb5696a 100644 --- a/magicblock-api/src/fund_account.rs +++ b/magicblock-api/src/fund_account.rs @@ -3,8 +3,7 @@ use std::path::Path; use magicblock_accounts_db::AccountsDb; use magicblock_core::traits::AccountsBank; use magicblock_magic_program_api as magic_program; -use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY; -use magicblock_program::{MagicContext, TaskContext}; +use magicblock_program::MagicContext; use solana_sdk::{ account::{AccountSharedData, WritableAccount}, pubkey::Pubkey, @@ -86,18 +85,3 @@ pub(crate) fn fund_magic_context(accountsdb: &AccountsDb) { accountsdb .insert_account(&magic_program::MAGIC_CONTEXT_PUBKEY, &magic_context); } - -pub(crate) fn fund_task_context(accountsdb: &AccountsDb) { - fund_account_with_data( - accountsdb, - &TASK_CONTEXT_PUBKEY, - u64::MAX, - TaskContext::SIZE, - ); - let mut task_context = accountsdb - .get_account(&magic_program::TASK_CONTEXT_PUBKEY) - .unwrap(); - task_context.set_delegated(true); - accountsdb - .insert_account(&magic_program::TASK_CONTEXT_PUBKEY, &task_context); -} diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 749489458..df797f988 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -84,8 +84,7 @@ use crate::{ remote_cluster_from_remote, try_convert_accounts_config, }, fund_account::{ - fund_magic_context, fund_task_context, funded_faucet, - init_validator_identity, + fund_magic_context, funded_faucet, init_validator_identity, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, ledger::{ @@ -139,7 +138,7 @@ pub struct MagicValidator { block_udpate_tx: BlockUpdateTx, _metrics: Option<(MetricsService, tokio::task::JoinHandle<()>)>, claim_fees_task: ClaimFeesTask, - task_scheduler_handle: Option>, + task_scheduler: Option, } impl MagicValidator { @@ -203,7 +202,6 @@ impl MagicValidator { init_validator_identity(&accountsdb, &validator_pubkey); fund_magic_context(&accountsdb); - fund_task_context(&accountsdb); let faucet_keypair = funded_faucet(&accountsdb, ledger.ledger_path().as_path())?; @@ -235,7 +233,7 @@ impl MagicValidator { let accounts_config = try_get_remote_accounts_config(&config.accounts)?; - let (dispatch, validator_channels) = link(); + let (mut dispatch, validator_channels) = link(); let committor_persist_path = storage_path.join("committor_service.sqlite"); @@ -284,6 +282,7 @@ impl MagicValidator { txn_to_process_rx: validator_channels.transaction_to_process, account_update_tx: validator_channels.account_update, environment: build_svm_env(&accountsdb, latest_block.blockhash, 0), + tasks_tx: validator_channels.tasks_service, }; txn_scheduler_state .load_upgradeable_programs(&programs_to_load(&config.programs)) @@ -322,6 +321,26 @@ impl MagicValidator { .await?; let rpc_handle = tokio::spawn(rpc.run()); + let task_scheduler_db_path = + SchedulerDatabase::path(ledger.ledger_path().parent().expect( + "ledger_path didn't have a parent, should never happen", + )); + debug!( + "Task scheduler persists to: {}", + task_scheduler_db_path.display() + ); + let task_scheduler = TaskSchedulerService::new( + &task_scheduler_db_path, + &config.task_scheduler, + dispatch.transaction_scheduler.clone(), + dispatch + .tasks_service + .take() + .expect("tasks_service should be initialized"), + ledger.latest_block().clone(), + token.clone(), + )?; + Ok(Self { accountsdb, config, @@ -340,7 +359,7 @@ impl MagicValidator { identity: validator_pubkey, transaction_scheduler: dispatch.transaction_scheduler, block_udpate_tx: validator_channels.block_update, - task_scheduler_handle: None, + task_scheduler: Some(task_scheduler), }) } @@ -653,30 +672,26 @@ impl MagicValidator { self.ledger_truncator.start(); - let task_scheduler_db_path = - SchedulerDatabase::path(self.ledger.ledger_path().parent().expect( - "ledger_path didn't have a parent, should never happen", - )); - debug!( - "Task scheduler persists to: {}", - task_scheduler_db_path.display() - ); - let task_scheduler_handle = TaskSchedulerService::start( - &task_scheduler_db_path, - &self.config.task_scheduler, - self.accountsdb.clone(), - self.transaction_scheduler.clone(), - self.ledger.latest_block().clone(), - self.token.clone(), - )?; // TODO: we should shutdown gracefully. // This is discussed in this comment: // https://github.com/magicblock-labs/magicblock-validator/pull/493#discussion_r2324560798 // However there is no proper solution for this right now. // An issue to create a shutdown system is open here: // https://github.com/magicblock-labs/magicblock-validator/issues/524 - self.task_scheduler_handle = Some(tokio::spawn(async move { - match task_scheduler_handle.await { + let task_scheduler = self + .task_scheduler + .take() + .expect("task_scheduler should be initialized"); + tokio::spawn(async move { + let join_handle = match task_scheduler.start() { + Ok(join_handle) => join_handle, + Err(err) => { + error!("Failed to start task scheduler: {:?}", err); + error!("Exiting process..."); + std::process::exit(1); + } + }; + match join_handle.await { Ok(Ok(())) => {} Ok(Err(err)) => { error!("An error occurred while running the task scheduler: {:?}", err); @@ -689,7 +704,7 @@ impl MagicValidator { std::process::exit(1); } } - })); + }); validator::finished_starting_up(); Ok(()) diff --git a/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs b/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs index 5db5cea80..f596c6ad4 100644 --- a/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs +++ b/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs @@ -24,7 +24,6 @@ pub fn blacklisted_accounts( blacklisted_accounts.insert(magic_program::ID); blacklisted_accounts.insert(magic_program::MAGIC_CONTEXT_PUBKEY); - blacklisted_accounts.insert(magic_program::TASK_CONTEXT_PUBKEY); blacklisted_accounts.insert(*validator_id); blacklisted_accounts.insert(*faucet_id); blacklisted_accounts diff --git a/magicblock-committor-program/Cargo.toml b/magicblock-committor-program/Cargo.toml index 15164f42a..b37ba7c44 100644 --- a/magicblock-committor-program/Cargo.toml +++ b/magicblock-committor-program/Cargo.toml @@ -29,3 +29,5 @@ doctest = false [features] no-entrypoint = [] default = [] +custom-heap = [] +custom-panic = [] diff --git a/magicblock-config/src/lib.rs b/magicblock-config/src/lib.rs index 921da754b..6008040cf 100644 --- a/magicblock-config/src/lib.rs +++ b/magicblock-config/src/lib.rs @@ -268,10 +268,7 @@ mod tests { port: 9090, }, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 1000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, }; let original_config = config.clone(); let other = EphemeralConfig::default(); @@ -356,10 +353,7 @@ mod tests { port: 9090, }, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 1000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, }; config.merge(other.clone()); @@ -441,10 +435,7 @@ mod tests { port: 9090, }, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 2000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, }; let original_config = config.clone(); let other = EphemeralConfig { @@ -519,10 +510,7 @@ mod tests { port: 9090, }, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 1000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, }; config.merge(other); diff --git a/magicblock-config/src/task_scheduler.rs b/magicblock-config/src/task_scheduler.rs index 478d3f290..078e23286 100644 --- a/magicblock-config/src/task_scheduler.rs +++ b/magicblock-config/src/task_scheduler.rs @@ -5,7 +5,15 @@ use serde::{Deserialize, Serialize}; #[clap_prefix("task-scheduler")] #[clap_from_serde] #[derive( - Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Args, Mergeable, + Debug, + Default, + Clone, + Serialize, + Deserialize, + PartialEq, + Eq, + Args, + Mergeable, )] #[serde(deny_unknown_fields, rename_all = "kebab-case")] pub struct TaskSchedulerConfig { @@ -13,21 +21,4 @@ pub struct TaskSchedulerConfig { #[derive_env_var] #[serde(default)] pub reset: bool, - /// Determines how frequently the task scheduler will check for executable tasks. - #[derive_env_var] - #[serde(default = "default_millis_per_tick")] - pub millis_per_tick: u64, -} - -impl Default for TaskSchedulerConfig { - fn default() -> Self { - Self { - reset: bool::default(), - millis_per_tick: default_millis_per_tick(), - } - } -} - -fn default_millis_per_tick() -> u64 { - 200 } diff --git a/magicblock-config/tests/fixtures/11_everything-defined.toml b/magicblock-config/tests/fixtures/11_everything-defined.toml index 01273cc48..f78d3f850 100644 --- a/magicblock-config/tests/fixtures/11_everything-defined.toml +++ b/magicblock-config/tests/fixtures/11_everything-defined.toml @@ -50,4 +50,3 @@ system-metrics-tick-interval-secs = 10 [task-scheduler] reset = true -millis-per-tick = 1000 diff --git a/magicblock-config/tests/parse_config.rs b/magicblock-config/tests/parse_config.rs index e9bfd1bda..35f6fd8a7 100644 --- a/magicblock-config/tests/parse_config.rs +++ b/magicblock-config/tests/parse_config.rs @@ -282,10 +282,7 @@ fn test_everything_defined() { addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), }, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 1000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, } ); } diff --git a/magicblock-config/tests/read_config.rs b/magicblock-config/tests/read_config.rs index 84b865a6b..95375f0ab 100644 --- a/magicblock-config/tests/read_config.rs +++ b/magicblock-config/tests/read_config.rs @@ -142,7 +142,6 @@ fn test_load_local_dev_with_programs_toml_envs_override() { env::set_var("METRICS_SYSTEM_METRICS_TICK_INTERVAL_SECS", "10"); env::set_var("CLONE_AUTO_AIRDROP_LAMPORTS", "123"); env::set_var("TASK_SCHEDULER_RESET", "true"); - env::set_var("TASK_SCHEDULER_MILLIS_PER_TICK", "1000"); let config = parse_config_with_file(&config_file_dir); @@ -202,10 +201,7 @@ fn test_load_local_dev_with_programs_toml_envs_override() { }, system_metrics_tick_interval_secs: 10, }, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: 1000, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, } ); diff --git a/magicblock-core/src/link.rs b/magicblock-core/src/link.rs index 83be2f3f1..1950dd424 100644 --- a/magicblock-core/src/link.rs +++ b/magicblock-core/src/link.rs @@ -2,8 +2,8 @@ use accounts::{AccountUpdateRx, AccountUpdateTx}; use blocks::{BlockUpdateRx, BlockUpdateTx}; use tokio::sync::mpsc; use transactions::{ - TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx, - TransactionToProcessRx, + ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle, + TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx, }; pub mod accounts; @@ -27,6 +27,8 @@ pub struct DispatchEndpoints { pub account_update: AccountUpdateRx, /// Receives notifications when a new block is produced. pub block_update: BlockUpdateRx, + /// Receives scheduled (crank) tasks from transactions executor. + pub tasks_service: Option, } /// A collection of channel endpoints for the **validator's internal core**. @@ -43,6 +45,8 @@ pub struct ValidatorChannelEndpoints { pub account_update: AccountUpdateTx, /// Sends notifications when a new block is produced to the pool of EventProcessor workers. pub block_update: BlockUpdateTx, + /// Sends scheduled (crank) tasks to tasks service from transactions executor. + pub tasks_service: ScheduledTasksTx, } /// Creates and connects the full set of communication channels between the dispatch @@ -58,6 +62,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { let (transaction_status_tx, transaction_status_rx) = flume::unbounded(); let (account_update_tx, account_update_rx) = flume::unbounded(); let (block_update_tx, block_update_rx) = flume::unbounded(); + let (tasks_tx, tasks_rx) = mpsc::unbounded_channel(); // Bounded channels for command queues where applying backpressure is important. let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY); @@ -68,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_status: transaction_status_rx, account_update: account_update_rx, block_update: block_update_rx, + tasks_service: Some(tasks_rx), }; // Bundle the corresponding channel ends for the validator's internal core. @@ -76,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_status: transaction_status_tx, account_update: account_update_tx, block_update: block_update_tx, + tasks_service: tasks_tx, }; (dispatch, validator) diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 779c871a7..ed4a3d271 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -1,4 +1,5 @@ use flume::{Receiver as MpmcReceiver, Sender as MpmcSender}; +use magicblock_magic_program_api::args::TaskRequest; use solana_program::message::{ inner_instruction::InnerInstructionsList, SimpleAddressLoader, }; @@ -11,7 +12,7 @@ use solana_transaction::{ use solana_transaction_context::TransactionReturnData; use solana_transaction_error::TransactionError; use tokio::sync::{ - mpsc::{Receiver, Sender}, + mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, oneshot, }; @@ -30,6 +31,11 @@ pub type TransactionToProcessRx = Receiver; /// The sender end of the channel used to send new transactions to the scheduler for processing. type TransactionToProcessTx = Sender; +/// The receiver end of the channel used to send scheduled tasks (cranking) +pub type ScheduledTasksRx = UnboundedReceiver; +/// The sender end of the channel used to send scheduled tasks (cranking) +pub type ScheduledTasksTx = UnboundedSender; + /// A cloneable handle that provides a high-level API for /// submitting transactions to the processing pipeline. /// diff --git a/magicblock-magic-program-api/src/args.rs b/magicblock-magic-program-api/src/args.rs index 322d50675..1a4b58af1 100644 --- a/magicblock-magic-program-api/src/args.rs +++ b/magicblock-magic-program-api/src/args.rs @@ -109,3 +109,40 @@ pub struct ScheduleTaskArgs { pub iterations: u64, pub instructions: Vec, } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum TaskRequest { + Schedule(ScheduleTaskRequest), + Cancel(CancelTaskRequest), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ScheduleTaskRequest { + /// Unique identifier for this task + pub id: u64, + /// Unsigned instructions to execute when triggered + pub instructions: Vec, + /// Authority that can modify or cancel this task + pub authority: Pubkey, + /// How frequently the task should be executed, in milliseconds + pub execution_interval_millis: u64, + /// Number of times this task will be executed + pub iterations: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CancelTaskRequest { + /// Unique identifier for the task to cancel + pub task_id: u64, + /// Authority that can cancel this task + pub authority: Pubkey, +} + +impl TaskRequest { + pub fn id(&self) -> u64 { + match self { + Self::Schedule(request) => request.id, + Self::Cancel(request) => request.task_id, + } + } +} diff --git a/magicblock-magic-program-api/src/instruction.rs b/magicblock-magic-program-api/src/instruction.rs index 6ff29bee2..8cd525396 100644 --- a/magicblock-magic-program-api/src/instruction.rs +++ b/magicblock-magic-program-api/src/instruction.rs @@ -87,13 +87,6 @@ pub enum MagicBlockInstruction { task_id: u64, }, - /// Process all tasks - /// - /// # Account references - /// - **0.** `[SIGNER]` Validator authority - /// - **1.** `[WRITE]` Task context account - ProcessTasks, - /// Disables the executable check, needed to modify the data of a program /// in preparation to deploying it via LoaderV4 and to modify its authority. /// diff --git a/magicblock-magic-program-api/src/lib.rs b/magicblock-magic-program-api/src/lib.rs index b22b7e975..be1028a75 100644 --- a/magicblock-magic-program-api/src/lib.rs +++ b/magicblock-magic-program-api/src/lib.rs @@ -1,5 +1,6 @@ pub mod args; pub mod instruction; +pub mod tls; pub use solana_program::{declare_id, pubkey, pubkey::Pubkey}; @@ -14,13 +15,3 @@ pub const MAGIC_CONTEXT_PUBKEY: Pubkey = /// NOTE: the default max accumulated account size per transaction is 64MB. /// See: MAX_LOADED_ACCOUNTS_DATA_SIZE_BYTES inside program-runtime/src/compute_budget_processor.rs pub const MAGIC_CONTEXT_SIZE: usize = 1024 * 1024 * 5; // 5 MB - -pub const TASK_CONTEXT_PUBKEY: Pubkey = - pubkey!("TaskContext11111111111111111111111111111111"); - -/// Requests are ix data, so they cannot exceed ~1kB. -/// With 1000 schedules per slot, that's 1MB per slot. -/// The task scheduler ticking once every 4 slots, that's 4MB. -/// This can be drastically reduced once we have a channel to the transaction executor. -/// https://github.com/magicblock-labs/magicblock-validator/issues/523 -pub const TASK_CONTEXT_SIZE: usize = 1024 * 1024 * 4; // 4 MB diff --git a/magicblock-magic-program-api/src/tls.rs b/magicblock-magic-program-api/src/tls.rs new file mode 100644 index 000000000..464f02417 --- /dev/null +++ b/magicblock-magic-program-api/src/tls.rs @@ -0,0 +1,32 @@ +use std::{cell::RefCell, collections::VecDeque}; + +use crate::args::TaskRequest; + +#[derive(Default, Debug)] +pub struct ExecutionTlsStash { + tasks: VecDeque, + // TODO(bmuddha/taco-paco): intents should go in here + intents: VecDeque<()>, +} + +thread_local! { + static EXECUTION_TLS_STASH: RefCell = RefCell::default(); +} + +impl ExecutionTlsStash { + pub fn register_task(task: TaskRequest) { + EXECUTION_TLS_STASH + .with_borrow_mut(|stash| stash.tasks.push_back(task)); + } + + pub fn next_task() -> Option { + EXECUTION_TLS_STASH.with_borrow_mut(|stash| stash.tasks.pop_front()) + } + + pub fn clear() { + EXECUTION_TLS_STASH.with_borrow_mut(|stash| { + stash.tasks.clear(); + stash.intents.clear(); + }) + } +} diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 8aa007057..3577ed1f4 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -18,6 +18,7 @@ magicblock-core = { workspace = true } magicblock-ledger = { workspace = true } magicblock-metrics = { workspace = true } magicblock-program = { workspace = true } +magicblock-magic-program-api = { workspace = true } solana-account = { workspace = true } solana-bpf-loader-program = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 08eb55c90..3aa47ef1b 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -5,7 +5,8 @@ use magicblock_accounts_db::{AccountsDb, StWLock}; use magicblock_core::link::{ accounts::AccountUpdateTx, transactions::{ - TransactionProcessingMode, TransactionStatusTx, TransactionToProcessRx, + ScheduledTasksTx, TransactionProcessingMode, TransactionStatusTx, + TransactionToProcessRx, }, }; use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger}; @@ -49,6 +50,8 @@ pub(super) struct TransactionExecutor { transaction_tx: TransactionStatusTx, /// A channel to send out account state updates after processing. accounts_tx: AccountUpdateTx, + /// A channel to send scheduled (crank) tasks created by transactions. + tasks_tx: ScheduledTasksTx, /// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work. ready_tx: Sender, /// A read lock held during a slot's processing to synchronize with critical global @@ -99,6 +102,7 @@ impl TransactionExecutor { ready_tx, accounts_tx: state.account_update_tx.clone(), transaction_tx: state.transaction_status_tx.clone(), + tasks_tx: state.tasks_tx.clone(), }; this.processor.fill_missing_sysvar_cache_entries(&this); diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 6bbd88098..5f9dd3813 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -1,4 +1,4 @@ -use log::error; +use log::{error, warn}; use magicblock_core::link::{ accounts::{AccountWithSlot, LockedAccount}, transactions::{ @@ -7,6 +7,7 @@ use magicblock_core::link::{ }, }; use magicblock_metrics::metrics::FAILED_TRANSACTIONS_COUNT; +use magicblock_program::tls::ExecutionTlsStash; use solana_pubkey::Pubkey; use solana_svm::{ account_loader::{AccountsBalances, CheckedTransactionDetails}, @@ -76,13 +77,27 @@ impl super::TransactionExecutor { // Otherwise commit the account state changes self.commit_accounts(feepayer, &processed, is_replay); - // And commit transaction to the ledger + // Commit transaction to the ledger and schedule tasks + // TODO: send intents here as well once implemented if !is_replay { self.commit_transaction(txn, processed, balances); + // If the transaction succeeded, check for potential tasks/intents + // that may have been scheduled during the transaction execution + if result.is_ok() { + while let Some(task) = ExecutionTlsStash::next_task() { + // This is a best effort send, if the tasks service has terminated + // for some reason, logging is the best we can do at this point + let _ = self.tasks_tx.send(task).inspect_err(|_| + warn!("Scheduled tasks service has hung up and is no longer running") + ); + } + } } result }); + // Make sure that no matter what happened to the transaction we clear the stash + ExecutionTlsStash::clear(); // Send the final result back to the caller if they are waiting. tx.map(|tx| tx.send(result)); @@ -128,6 +143,9 @@ impl super::TransactionExecutor { inner_instructions: None, }, }; + // Make sure that we clear the stash, so that simulations + // don't interfere with actual transaction executions + ExecutionTlsStash::clear(); let _ = tx.send(result); } diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 531ac1ca5..bed813087 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -3,7 +3,9 @@ use std::sync::{Arc, OnceLock, RwLock}; use magicblock_accounts_db::AccountsDb; use magicblock_core::link::{ accounts::AccountUpdateTx, - transactions::{TransactionStatusTx, TransactionToProcessRx}, + transactions::{ + ScheduledTasksTx, TransactionStatusTx, TransactionToProcessRx, + }, }; use magicblock_ledger::Ledger; use solana_account::AccountSharedData; @@ -40,6 +42,8 @@ pub struct TransactionSchedulerState { pub account_update_tx: AccountUpdateTx, /// The channel for sending final transaction statuses to downstream consumers. pub transaction_status_tx: TransactionStatusTx, + /// A channel to send scheduled (crank) tasks created by transactions. + pub tasks_tx: ScheduledTasksTx, } impl TransactionSchedulerState { diff --git a/magicblock-task-scheduler/Cargo.toml b/magicblock-task-scheduler/Cargo.toml index 9c1fcc9df..3fa924ce0 100644 --- a/magicblock-task-scheduler/Cargo.toml +++ b/magicblock-task-scheduler/Cargo.toml @@ -28,3 +28,11 @@ solana-pubsub-client = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true, features = ["time"] } + +[dev-dependencies] +magicblock-magic-program-api = { workspace = true } +test-kit = { workspace = true } +guinea = { workspace = true } +solana-account = { workspace = true } +solana-pubkey = { workspace = true } +solana-signature = { workspace = true } diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index f479e4409..8fa4463bf 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -1,30 +1,25 @@ use std::{ collections::HashMap, path::Path, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; use futures_util::StreamExt; use log::*; use magicblock_config::TaskSchedulerConfig; -use magicblock_core::{ - link::transactions::TransactionSchedulerHandle, traits::AccountsBank, +use magicblock_core::link::transactions::{ + ScheduledTasksRx, TransactionSchedulerHandle, }; use magicblock_ledger::LatestBlock; use magicblock_program::{ - instruction_utils::InstructionUtils, + args::{CancelTaskRequest, ScheduleTaskRequest, TaskRequest}, validator::{validator_authority, validator_authority_id}, - CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext, - TaskRequest, TASK_CONTEXT_PUBKEY, }; use solana_sdk::{ - account::ReadableAccount, instruction::Instruction, message::Message, - pubkey::Pubkey, signature::Signature, transaction::Transaction, + instruction::Instruction, message::Message, pubkey::Pubkey, + signature::Signature, transaction::Transaction, }; -use tokio::{select, time::Duration}; +use tokio::{select, task::JoinHandle, time::Duration}; use tokio_util::{ sync::CancellationToken, time::{delay_queue::Key, DelayQueue}, @@ -38,40 +33,36 @@ use crate::{ const NOOP_PROGRAM_ID: Pubkey = Pubkey::from_str_const("noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV"); -pub struct TaskSchedulerService { +pub struct TaskSchedulerService { /// Database for persisting tasks db: SchedulerDatabase, - /// Bank for executing tasks - bank: Arc, /// Used to send transactions for execution tx_scheduler: TransactionSchedulerHandle, + /// Used to receive scheduled tasks from the transaction executor + scheduled_tasks: ScheduledTasksRx, /// Provides latest blockhash for signing transactions block: LatestBlock, - /// Interval at which the task scheduler will check for requests in the context - tick_interval: Duration, /// Queue of tasks to execute task_queue: DelayQueue, /// Map of task IDs to their corresponding keys in the task queue task_queue_keys: HashMap, /// Counter used to make each transaction unique tx_counter: AtomicU64, + /// Token used to cancel the task scheduler + token: CancellationToken, } -unsafe impl Send for TaskSchedulerService {} -unsafe impl Sync for TaskSchedulerService {} -impl TaskSchedulerService { - pub fn start( +unsafe impl Send for TaskSchedulerService {} +unsafe impl Sync for TaskSchedulerService {} +impl TaskSchedulerService { + pub fn new( path: &Path, config: &TaskSchedulerConfig, - bank: Arc, tx_scheduler: TransactionSchedulerHandle, + scheduled_tasks: ScheduledTasksRx, block: LatestBlock, token: CancellationToken, - ) -> Result< - tokio::task::JoinHandle>, - TaskSchedulerError, - > { - debug!("Initializing task scheduler service"); + ) -> Result { if config.reset { match std::fs::remove_file(path) { Ok(_) => {} @@ -87,83 +78,77 @@ impl TaskSchedulerService { // Reschedule all persisted tasks let db = SchedulerDatabase::new(path)?; - let tasks = db.get_tasks()?; - let mut service = Self { + Ok(Self { db, - bank, tx_scheduler, + scheduled_tasks, block, - tick_interval: Duration::from_millis(config.millis_per_tick), task_queue: DelayQueue::new(), task_queue_keys: HashMap::new(), tx_counter: AtomicU64::default(), - }; + token, + }) + } + + pub fn start( + mut self, + ) -> TaskSchedulerResult>> { + let tasks = self.db.get_tasks()?; let now = chrono::Utc::now().timestamp_millis() as u64; - debug!("Task scheduler started at {}", now); + debug!( + "Task scheduler starting at {} with {} tasks", + now, + tasks.len() + ); for task in tasks { let next_execution = task.last_execution_millis + task.execution_interval_millis; let timeout = Duration::from_millis(next_execution.saturating_sub(now)); let task_id = task.id; - let key = service.task_queue.insert(task, timeout); - service.task_queue_keys.insert(task_id, key); + let key = self.task_queue.insert(task, timeout); + self.task_queue_keys.insert(task_id, key); } - Ok(tokio::spawn(service.run(token))) + Ok(tokio::spawn(async move { self.run().await })) } - fn process_context_requests( + fn process_request( &mut self, - requests: &Vec, - ) -> TaskSchedulerResult> { - let mut errors = Vec::with_capacity(requests.len()); - for request in requests { - match request { - TaskRequest::Schedule(schedule_request) => { - if let Err(e) = - self.process_schedule_request(schedule_request) - { - self.db.insert_failed_scheduling( - schedule_request.id, - format!("{:?}", e), - )?; - error!( - "Failed to process schedule request {}: {}", - schedule_request.id, e - ); - errors.push(e); - } + request: &TaskRequest, + ) -> TaskSchedulerResult> { + match request { + TaskRequest::Schedule(schedule_request) => { + if let Err(e) = self.register_task(schedule_request) { + self.db.insert_failed_scheduling( + schedule_request.id, + format!("{:?}", e), + )?; + error!( + "Failed to process schedule request {}: {}", + schedule_request.id, e + ); + + return Ok(Err(e)); } - TaskRequest::Cancel(cancel_request) => { - if let Err(e) = self.process_cancel_request(cancel_request) - { - self.db.insert_failed_scheduling( - cancel_request.task_id, - format!("{:?}", e), - )?; - error!( - "Failed to process cancel request for task {}: {}", - cancel_request.task_id, e - ); - errors.push(e); - } + } + TaskRequest::Cancel(cancel_request) => { + if let Err(e) = self.process_cancel_request(cancel_request) { + self.db.insert_failed_scheduling( + cancel_request.task_id, + format!("{:?}", e), + )?; + error!( + "Failed to process cancel request for task {}: {}", + cancel_request.task_id, e + ); + + return Ok(Err(e)); } - }; - } - - Ok(errors) - } - - fn process_schedule_request( - &mut self, - schedule_request: &ScheduleTaskRequest, - ) -> TaskSchedulerResult<()> { - // Convert request to task and register in database - let task = CrankTask::from(schedule_request); - self.register_task(&task)?; + } + }; - Ok(()) + Ok(Ok(())) } fn process_cancel_request( @@ -224,7 +209,7 @@ impl TaskSchedulerService { pub fn register_task( &mut self, - task: &CrankTask, + task: &ScheduleTaskRequest, ) -> TaskSchedulerResult<()> { let db_task = DbTask { id: task.id, @@ -261,11 +246,7 @@ impl TaskSchedulerService { Ok(()) } - async fn run( - mut self, - token: CancellationToken, - ) -> TaskSchedulerResult<()> { - let mut interval = tokio::time::interval(self.tick_interval); + pub async fn run(&mut self) -> TaskSchedulerResult<()> { loop { select! { Some(task) = self.task_queue.next() => { @@ -279,48 +260,19 @@ impl TaskSchedulerService { self.db.insert_failed_task(task.id, format!("{:?}", e))?; } } - _ = interval.tick() => { - // HACK: we deserialize the context on every tick avoid using geyser. This will be fixed once the channel to the transaction executor is implemented. - // Performance should not be too bad because the context should be small. - // https://github.com/magicblock-labs/magicblock-validator/issues/523 - - // Process any existing requests from the context - let Some(context_account) = self.bank.get_account(&TASK_CONTEXT_PUBKEY) else { - error!("Task context account not found"); - return Err(TaskSchedulerError::TaskContextNotFound); - }; - - let task_context = bincode::deserialize::(context_account.data()).unwrap_or_default(); - - if task_context.requests.is_empty() { - // Nothing to do because there are no requests in the context - continue; - } - - match self.process_context_requests(&task_context.requests) { - Ok(errors) => { - if !errors.is_empty() { - warn!("Failed to process {} requests out of {}", errors.len(), task_context.requests.len()); - } - - // All requests were processed, reset the context - if let Err(e) = self.process_transaction(vec![ - InstructionUtils::process_tasks_instruction( - &validator_authority_id(), - ), - ]).await { - error!("Failed to reset task context: {}", e); - return Err(e); - } - debug!("Processed {} requests", task_context.requests.len()); + Some(task) = self.scheduled_tasks.recv() => { + match self.process_request(&task) { + Ok(Err(e)) => { + warn!("Failed to process request ID={}: {e:?}", task.id()); } Err(e) => { - error!("Failed to process context requests: {}", e); + error!("Failed to process request: {}", e); return Err(e); } + _ => {} } } - _ = token.cancelled() => { + _ = self.token.cancelled() => { break; } } diff --git a/magicblock-task-scheduler/tests/service.rs b/magicblock-task-scheduler/tests/service.rs new file mode 100644 index 000000000..64a81b7cc --- /dev/null +++ b/magicblock-task-scheduler/tests/service.rs @@ -0,0 +1,215 @@ +use std::time::Duration; + +use guinea::GuineaInstruction; +use magicblock_config::TaskSchedulerConfig; +use magicblock_program::{ + args::ScheduleTaskArgs, + validator::{init_validator_authority_if_needed, validator_authority_id}, +}; +use magicblock_task_scheduler::{ + errors::TaskSchedulerResult, SchedulerDatabase, TaskSchedulerError, + TaskSchedulerService, +}; +use solana_account::ReadableAccount; +use solana_program::{ + instruction::{AccountMeta, Instruction}, + native_token::LAMPORTS_PER_SOL, +}; +use test_kit::{ExecutionTestEnv, Signer}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +type SetupResult = TaskSchedulerResult<( + ExecutionTestEnv, + CancellationToken, + JoinHandle>, +)>; + +fn setup() -> SetupResult { + let mut env = ExecutionTestEnv::new(); + + init_validator_authority_if_needed(env.payer.insecure_clone()); + // NOTE: validator authority is unique for all tests in this file, but the payer changes for each test + // Airdrop some SOL to the validator authority, which is used to pay task fees + env.fund_account(validator_authority_id(), LAMPORTS_PER_SOL); + + let token = CancellationToken::new(); + let task_scheduler_db_path = SchedulerDatabase::path( + env.ledger + .ledger_path() + .parent() + .expect("ledger_path didn't have a parent, should never happen"), + ); + let handle = TaskSchedulerService::new( + &task_scheduler_db_path, + &TaskSchedulerConfig::default(), + env.transaction_scheduler.clone(), + env.dispatch + .tasks_service + .take() + .expect("Tasks service should be initialized"), + env.ledger.latest_block().clone(), + token.clone(), + )? + .start()?; + + Ok((env, token, handle)) +} + +#[tokio::test] +pub async fn test_schedule_task() -> TaskSchedulerResult<()> { + let (env, token, handle) = setup()?; + + let account = + env.create_account_with_config(LAMPORTS_PER_SOL, 1, guinea::ID); + + // Schedule a task + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::ScheduleTask(ScheduleTaskArgs { + task_id: 1, + execution_interval_millis: 10, + iterations: 1, + instructions: vec![Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::Increment, + vec![AccountMeta::new(account.pubkey(), false)], + )], + }), + vec![ + AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), + AccountMeta::new(env.payer.pubkey(), true), + AccountMeta::new(account.pubkey(), false), + ], + ); + let txn = env.build_transaction(&[ix]); + let result = env.execute_transaction(txn).await; + assert!( + result.is_ok(), + "failed to execute schedule task transaction: {:?}", + result + ); + + // Wait until the task scheduler actually mutates the account (with an upper bound to avoid hangs) + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if env.get_account(account.pubkey()).data().first() == Some(&1) { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect("task scheduler never incremented the account within 1s"); + + token.cancel(); + handle.await.expect("task service join handle failed")?; + + Ok(()) +} + +#[tokio::test] +pub async fn test_cancel_task() -> TaskSchedulerResult<()> { + let (env, token, handle) = setup()?; + + let account = + env.create_account_with_config(LAMPORTS_PER_SOL, 1, guinea::ID); + + // Schedule a task + let task_id = 2; + let interval = 100; + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::ScheduleTask(ScheduleTaskArgs { + task_id, + execution_interval_millis: interval, + iterations: 100, + instructions: vec![Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::Increment, + vec![AccountMeta::new(account.pubkey(), false)], + )], + }), + vec![ + AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), + AccountMeta::new(env.payer.pubkey(), true), + AccountMeta::new(account.pubkey(), false), + ], + ); + let txn = env.build_transaction(&[ix]); + let result = env.execute_transaction(txn).await; + assert!( + result.is_ok(), + "failed to execute schedule task transaction: {:?}", + result + ); + + // Wait until we actually observe at least five executions + let executed_before_cancel = + tokio::time::timeout(Duration::from_millis(10 * interval), async { + loop { + if let Some(value) = + env.get_account(account.pubkey()).data().first() + { + if *value >= 5 { + break *value; + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect( + "task scheduler never reached five executions within 10 intervals", + ); + + // Cancel the task + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::CancelTask(task_id), + vec![ + AccountMeta::new_readonly(magicblock_magic_program_api::ID, false), + AccountMeta::new(env.payer.pubkey(), true), + ], + ); + let txn = env.build_transaction(&[ix]); + let result = env.execute_transaction(txn).await; + assert!( + result.is_ok(), + "failed to execute cancel task transaction: {:?}", + result + ); + + let value_at_cancel = env + .get_account(account.pubkey()) + .data() + .first() + .copied() + .unwrap_or_default(); + assert!( + value_at_cancel >= executed_before_cancel, + "unexpected: value at cancellation ({}) < value when 5 executions were observed ({})", + value_at_cancel, + executed_before_cancel + ); + + // Ensure the scheduler stops issuing executions after cancellation + tokio::time::sleep(Duration::from_millis(2 * interval)).await; + + let value_after_cancel = env + .get_account(account.pubkey()) + .data() + .first() + .copied() + .unwrap_or_default(); + + assert_eq!( + value_after_cancel, value_at_cancel, + "task scheduler kept executing after cancellation" + ); + + token.cancel(); + handle.await.expect("task service join handle failed")?; + + Ok(()) +} diff --git a/programs/elfs/guinea.so b/programs/elfs/guinea.so index c60e6ed01..62c6cedb4 100755 Binary files a/programs/elfs/guinea.so and b/programs/elfs/guinea.so differ diff --git a/programs/guinea/Cargo.toml b/programs/guinea/Cargo.toml index 9885dbb73..faf9d0756 100644 --- a/programs/guinea/Cargo.toml +++ b/programs/guinea/Cargo.toml @@ -16,3 +16,4 @@ bincode = { workspace = true } serde = { workspace = true } solana-program = { workspace = true } +magicblock-magic-program-api = { workspace = true } diff --git a/programs/guinea/src/lib.rs b/programs/guinea/src/lib.rs index 190341e5c..c64284876 100644 --- a/programs/guinea/src/lib.rs +++ b/programs/guinea/src/lib.rs @@ -1,13 +1,17 @@ #![allow(unexpected_cfgs)] use core::slice; +use magicblock_magic_program_api::{ + args::ScheduleTaskArgs, instruction::MagicBlockInstruction, +}; use serde::{Deserialize, Serialize}; use solana_program::{ account_info::{next_account_info, AccountInfo}, declare_id, entrypoint::{self, ProgramResult}, + instruction::{AccountMeta, Instruction}, log, - program::set_return_data, + program::{invoke, set_return_data}, program_error::ProgramError, pubkey::Pubkey, }; @@ -20,8 +24,11 @@ pub enum GuineaInstruction { ComputeBalances, PrintSizes, WriteByteToData(u8), + Increment, Transfer(u64), Resize(usize), + ScheduleTask(ScheduleTaskArgs), + CancelTask(u64), } fn compute_balances(accounts: slice::Iter) { @@ -57,6 +64,18 @@ fn write_byte_to_data( Ok(()) } +fn increment(accounts: slice::Iter) -> ProgramResult { + for a in accounts { + let mut data = a.try_borrow_mut_data()?; + let first = + data.first_mut().ok_or(ProgramError::AccountDataTooSmall)?; + *first = first + .checked_add(1) + .ok_or(ProgramError::ArithmeticOverflow)?; + } + Ok(()) +} + fn transfer( mut accounts: slice::Iter, lamports: u64, @@ -80,6 +99,62 @@ fn transfer( Ok(()) } +fn schedule_task( + mut accounts: slice::Iter, + args: ScheduleTaskArgs, +) -> ProgramResult { + let magic_program_info = next_account_info(&mut accounts)?; + let payer_info = next_account_info(&mut accounts)?; + let counter_pda_info = next_account_info(&mut accounts)?; + + if magic_program_info.key != &magicblock_magic_program_api::ID { + return Err(ProgramError::InvalidAccountData); + } + + if !payer_info.is_signer { + return Err(ProgramError::MissingRequiredSignature); + } + + let ix = Instruction::new_with_bincode( + magicblock_magic_program_api::ID, + &MagicBlockInstruction::ScheduleTask(args), + vec![ + AccountMeta::new(*payer_info.key, true), + AccountMeta::new(*counter_pda_info.key, false), + ], + ); + + invoke(&ix, &[payer_info.clone(), counter_pda_info.clone()])?; + + Ok(()) +} + +fn cancel_task( + mut accounts: slice::Iter, + task_id: u64, +) -> ProgramResult { + let magic_program_info = next_account_info(&mut accounts)?; + let payer_info = next_account_info(&mut accounts)?; + + if magic_program_info.key != &magicblock_magic_program_api::ID { + return Err(ProgramError::InvalidAccountData); + } + + if !payer_info.is_signer { + return Err(ProgramError::MissingRequiredSignature); + } + + let ix = Instruction::new_with_bincode( + magicblock_magic_program_api::ID, + &MagicBlockInstruction::CancelTask { task_id }, + vec![AccountMeta::new(*payer_info.key, true)], + ); + + invoke(&ix, &[payer_info.clone()])?; + + Ok(()) +} + fn process_instruction( _program_id: &Pubkey, accounts: &[AccountInfo], @@ -100,8 +175,15 @@ fn process_instruction( GuineaInstruction::WriteByteToData(byte) => { write_byte_to_data(accounts, byte)? } + GuineaInstruction::Increment => increment(accounts)?, GuineaInstruction::Transfer(lamports) => transfer(accounts, lamports)?, GuineaInstruction::Resize(size) => resize_account(accounts, size)?, + GuineaInstruction::ScheduleTask(request) => { + schedule_task(accounts, request)? + } + GuineaInstruction::CancelTask(task_id) => { + cancel_task(accounts, task_id)? + } } Ok(()) } diff --git a/programs/magicblock/src/lib.rs b/programs/magicblock/src/lib.rs index 2b40ffde8..76d21bde8 100644 --- a/programs/magicblock/src/lib.rs +++ b/programs/magicblock/src/lib.rs @@ -6,10 +6,6 @@ mod schedule_transactions; mod toggle_executable_check; pub use magic_context::MagicContext; pub mod magic_scheduled_base_intent; -pub mod task_context; -pub use task_context::{ - CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext, TaskRequest, -}; pub mod magicblock_processor; pub mod test_utils; mod utils; diff --git a/programs/magicblock/src/magicblock_processor.rs b/programs/magicblock/src/magicblock_processor.rs index 60cc13486..433e3a6cf 100644 --- a/programs/magicblock/src/magicblock_processor.rs +++ b/programs/magicblock/src/magicblock_processor.rs @@ -5,9 +5,7 @@ use solana_sdk::program_utils::limited_deserialize; use crate::{ mutate_accounts::process_mutate_accounts, process_scheduled_commit_sent, - schedule_task::{ - process_cancel_task, process_process_tasks, process_schedule_task, - }, + schedule_task::{process_cancel_task, process_schedule_task}, schedule_transactions::{ process_accept_scheduled_commits, process_schedule_base_intent, process_schedule_commit, ProcessScheduleCommitOptions, @@ -73,7 +71,6 @@ declare_process_instruction!( CancelTask { task_id } => { process_cancel_task(signers, invoke_context, task_id) } - ProcessTasks => process_process_tasks(signers, invoke_context), DisableExecutableCheck => { process_toggle_executable_check(signers, invoke_context, false) } diff --git a/programs/magicblock/src/schedule_task/mod.rs b/programs/magicblock/src/schedule_task/mod.rs index 874d7a012..18252f7de 100644 --- a/programs/magicblock/src/schedule_task/mod.rs +++ b/programs/magicblock/src/schedule_task/mod.rs @@ -1,7 +1,5 @@ mod process_cancel_task; -mod process_process_tasks; mod process_schedule_task; -mod utils; + pub(crate) use process_cancel_task::*; -pub(crate) use process_process_tasks::*; pub(crate) use process_schedule_task::*; diff --git a/programs/magicblock/src/schedule_task/process_cancel_task.rs b/programs/magicblock/src/schedule_task/process_cancel_task.rs index 3da2df793..f8aeec33d 100644 --- a/programs/magicblock/src/schedule_task/process_cancel_task.rs +++ b/programs/magicblock/src/schedule_task/process_cancel_task.rs @@ -1,17 +1,14 @@ use std::collections::HashSet; +use magicblock_magic_program_api::{ + args::{CancelTaskRequest, TaskRequest}, + tls::ExecutionTlsStash, +}; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; use solana_sdk::{instruction::InstructionError, pubkey::Pubkey}; -use crate::{ - schedule_task::utils::check_task_context_id, - task_context::{CancelTaskRequest, TaskContext}, - utils::accounts::{ - get_instruction_account_with_idx, get_instruction_pubkey_with_idx, - }, - TaskRequest, -}; +use crate::utils::accounts::get_instruction_pubkey_with_idx; pub(crate) fn process_cancel_task( signers: HashSet, @@ -19,9 +16,6 @@ pub(crate) fn process_cancel_task( task_id: u64, ) -> Result<(), InstructionError> { const TASK_AUTHORITY_IDX: u16 = 0; - const TASK_CONTEXT_IDX: u16 = TASK_AUTHORITY_IDX + 1; - - check_task_context_id(invoke_context, TASK_CONTEXT_IDX)?; let transaction_context = &invoke_context.transaction_context.clone(); @@ -45,12 +39,8 @@ pub(crate) fn process_cancel_task( authority: *task_authority_pubkey, }; - // Get the task context account - let context_acc = get_instruction_account_with_idx( - transaction_context, - TASK_CONTEXT_IDX, - )?; - TaskContext::add_request(context_acc, TaskRequest::Cancel(cancel_request))?; + // Add cancel request to execution TLS stash + ExecutionTlsStash::register_task(TaskRequest::Cancel(cancel_request)); ic_msg!( invoke_context, @@ -63,9 +53,7 @@ pub(crate) fn process_cancel_task( #[cfg(test)] mod test { - use magicblock_magic_program_api::{ - instruction::MagicBlockInstruction, TASK_CONTEXT_PUBKEY, - }; + use magicblock_magic_program_api::instruction::MagicBlockInstruction; use solana_sdk::{ account::AccountSharedData, instruction::{AccountMeta, Instruction, InstructionError}, @@ -76,7 +64,6 @@ mod test { use crate::{ instruction_utils::InstructionUtils, test_utils::process_instruction, - TaskContext, }; #[test] @@ -86,20 +73,10 @@ mod test { let ix = InstructionUtils::cancel_task_instruction(&payer.pubkey(), task_id); - let transaction_accounts = vec![ - ( - payer.pubkey(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; + let transaction_accounts = vec![( + payer.pubkey(), + AccountSharedData::new(u64::MAX, 0, &system_program::id()), + )]; let expected_result = Ok(()); process_instruction( @@ -110,73 +87,21 @@ mod test { ); } - #[test] - fn fail_process_cancel_task_wrong_context() { - let payer = Keypair::new(); - let wrong_context = Keypair::new().pubkey(); - let task_id = 1; - - let account_metas = vec![ - AccountMeta::new(payer.pubkey(), true), - AccountMeta::new(wrong_context, false), - ]; - let ix = Instruction::new_with_bincode( - crate::id(), - &MagicBlockInstruction::CancelTask { task_id }, - account_metas, - ); - let transaction_accounts = vec![ - ( - payer.pubkey(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - wrong_context, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; - let expected_result = Err(InstructionError::MissingAccount); - - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - expected_result, - ); - } - #[test] fn fail_unsigned_process_cancel_task() { let payer = Keypair::new(); let task_id = 1; - let account_metas = vec![ - AccountMeta::new(payer.pubkey(), false), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; + let account_metas = vec![AccountMeta::new(payer.pubkey(), false)]; let ix = Instruction::new_with_bincode( crate::id(), &MagicBlockInstruction::CancelTask { task_id }, account_metas, ); - let transaction_accounts = vec![ - ( - payer.pubkey(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; + let transaction_accounts = vec![( + payer.pubkey(), + AccountSharedData::new(u64::MAX, 0, &system_program::id()), + )]; let expected_result = Err(InstructionError::MissingRequiredSignature); process_instruction( diff --git a/programs/magicblock/src/schedule_task/process_process_tasks.rs b/programs/magicblock/src/schedule_task/process_process_tasks.rs deleted file mode 100644 index 7e690bcf3..000000000 --- a/programs/magicblock/src/schedule_task/process_process_tasks.rs +++ /dev/null @@ -1,214 +0,0 @@ -use std::collections::HashSet; - -use solana_log_collector::ic_msg; -use solana_program_runtime::invoke_context::InvokeContext; -use solana_sdk::{instruction::InstructionError, pubkey::Pubkey}; - -use crate::{ - schedule_task::utils::check_task_context_id, - task_context::TaskContext, - utils::accounts::{ - get_instruction_account_with_idx, get_instruction_pubkey_with_idx, - }, - validator::validator_authority_id, -}; - -pub(crate) fn process_process_tasks( - signers: HashSet, - invoke_context: &mut InvokeContext, -) -> Result<(), InstructionError> { - const PROCESSOR_AUTHORITY_IDX: u16 = 0; - const TASK_CONTEXT_IDX: u16 = PROCESSOR_AUTHORITY_IDX + 1; - - check_task_context_id(invoke_context, TASK_CONTEXT_IDX)?; - - let transaction_context = &invoke_context.transaction_context.clone(); - - // Validate that the task authority is a signer - let processor_authority_pubkey = get_instruction_pubkey_with_idx( - transaction_context, - PROCESSOR_AUTHORITY_IDX, - )?; - if !signers.contains(processor_authority_pubkey) { - ic_msg!( - invoke_context, - "ProcessTasks ERR: processor authority {} not in signers", - processor_authority_pubkey - ); - return Err(InstructionError::MissingRequiredSignature); - } - - // Validate that the processor authority is the validator authority - if processor_authority_pubkey.ne(&validator_authority_id()) { - ic_msg!( - invoke_context, - "ProcessTasks ERR: processor authority {} is not the validator authority", - processor_authority_pubkey - ); - return Err(InstructionError::MissingRequiredSignature); - } - - // Get the task context account - let context_acc = get_instruction_account_with_idx( - transaction_context, - TASK_CONTEXT_IDX, - )?; - TaskContext::clear_requests(context_acc)?; - - ic_msg!( - invoke_context, - "Successfully cleared requests from task context", - ); - - Ok(()) -} - -#[cfg(test)] -mod test { - use magicblock_magic_program_api::{ - instruction::MagicBlockInstruction, TASK_CONTEXT_PUBKEY, - }; - use solana_sdk::{ - account::AccountSharedData, - instruction::{AccountMeta, Instruction, InstructionError}, - signature::Keypair, - signer::Signer, - system_program, - }; - - use crate::{ - instruction_utils::InstructionUtils, - test_utils::process_instruction, - validator::{ - generate_validator_authority_if_needed, validator_authority_id, - }, - TaskContext, - }; - - #[test] - fn test_process_tasks() { - generate_validator_authority_if_needed(); - - let ix = InstructionUtils::process_tasks_instruction( - &validator_authority_id(), - ); - let transaction_accounts = vec![ - ( - validator_authority_id(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; - - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - Ok(()), - ); - } - - #[test] - fn fail_process_tasks_wrong_context() { - generate_validator_authority_if_needed(); - - let ix = InstructionUtils::process_tasks_instruction( - &validator_authority_id(), - ); - let wrong_context = Keypair::new().pubkey(); - let transaction_accounts = vec![ - ( - validator_authority_id(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - wrong_context, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; - - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - Err(InstructionError::MissingAccount), - ); - } - - #[test] - fn fail_process_tasks_wrong_authority() { - generate_validator_authority_if_needed(); - - let wrong_authority = Keypair::new().pubkey(); - let ix = InstructionUtils::process_tasks_instruction(&wrong_authority); - let transaction_accounts = vec![ - ( - wrong_authority, - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; - - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - Err(InstructionError::MissingRequiredSignature), - ); - } - - #[test] - fn fail_unsigned_process_tasks() { - generate_validator_authority_if_needed(); - - let account_metas = vec![ - AccountMeta::new(validator_authority_id(), false), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; - let ix = Instruction::new_with_bincode( - crate::id(), - &MagicBlockInstruction::ProcessTasks, - account_metas, - ); - - let transaction_accounts = vec![ - ( - validator_authority_id(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; - - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - Err(InstructionError::MissingRequiredSignature), - ); - } -} diff --git a/programs/magicblock/src/schedule_task/process_schedule_task.rs b/programs/magicblock/src/schedule_task/process_schedule_task.rs index a40935b20..825b6200d 100644 --- a/programs/magicblock/src/schedule_task/process_schedule_task.rs +++ b/programs/magicblock/src/schedule_task/process_schedule_task.rs @@ -1,34 +1,31 @@ use std::collections::HashSet; -use magicblock_magic_program_api::args::ScheduleTaskArgs; +use magicblock_magic_program_api::{ + args::{ScheduleTaskArgs, ScheduleTaskRequest, TaskRequest}, + tls::ExecutionTlsStash, +}; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; use solana_sdk::{instruction::InstructionError, pubkey::Pubkey}; use crate::{ - schedule_task::utils::check_task_context_id, - task_context::{ScheduleTaskRequest, TaskContext, MIN_EXECUTION_INTERVAL}, - utils::accounts::{ - get_instruction_account_with_idx, get_instruction_pubkey_with_idx, - }, + utils::accounts::get_instruction_pubkey_with_idx, validator::validator_authority_id, - TaskRequest, }; +const MIN_EXECUTION_INTERVAL: u64 = 10; + pub(crate) fn process_schedule_task( signers: HashSet, invoke_context: &mut InvokeContext, args: ScheduleTaskArgs, ) -> Result<(), InstructionError> { const PAYER_IDX: u16 = 0; - const TASK_CONTEXT_IDX: u16 = PAYER_IDX + 1; - - check_task_context_id(invoke_context, TASK_CONTEXT_IDX)?; let transaction_context = &invoke_context.transaction_context.clone(); let ix_ctx = transaction_context.get_current_instruction_context()?; let ix_accs_len = ix_ctx.get_number_of_instruction_accounts() as usize; - const ACCOUNTS_START: usize = TASK_CONTEXT_IDX as usize + 1; + const ACCOUNTS_START: usize = PAYER_IDX as usize + 1; // Assert MagicBlock program ix_ctx @@ -73,7 +70,7 @@ pub(crate) fn process_schedule_task( return Err(InstructionError::InvalidInstructionData); } - // Enforce minimal number of executions + // Enforce minimal number of instructions if args.instructions.is_empty() { ic_msg!( invoke_context, @@ -113,14 +110,8 @@ pub(crate) fn process_schedule_task( iterations: args.iterations, }; - let context_acc = get_instruction_account_with_idx( - transaction_context, - TASK_CONTEXT_IDX, - )?; - TaskContext::add_request( - context_acc, - TaskRequest::Schedule(schedule_request), - )?; + // Add schedule request to execution TLS stash + ExecutionTlsStash::register_task(TaskRequest::Schedule(schedule_request)); ic_msg!( invoke_context, @@ -133,9 +124,7 @@ pub(crate) fn process_schedule_task( #[cfg(test)] mod test { - use magicblock_magic_program_api::{ - instruction::MagicBlockInstruction, TASK_CONTEXT_PUBKEY, - }; + use magicblock_magic_program_api::instruction::MagicBlockInstruction; use solana_sdk::{ account::AccountSharedData, instruction::{AccountMeta, Instruction}, @@ -185,20 +174,10 @@ mod test { let pdas = (0..n_pdas) .map(|_| Keypair::new().pubkey()) .collect::>(); - let mut transaction_accounts = vec![ - ( - payer.pubkey(), - AccountSharedData::new(u64::MAX, 0, &system_program::id()), - ), - ( - TASK_CONTEXT_PUBKEY, - AccountSharedData::new( - u64::MAX, - TaskContext::SIZE, - &system_program::id(), - ), - ), - ]; + let mut transaction_accounts = vec![( + payer.pubkey(), + AccountSharedData::new(u64::MAX, 0, &system_program::id()), + )]; transaction_accounts.extend( pdas.iter() .map(|pda| { @@ -323,10 +302,7 @@ mod test { iterations: 1, instructions: vec![create_simple_ix()], }; - let account_metas = vec![ - AccountMeta::new(payer.pubkey(), false), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; + let account_metas = vec![AccountMeta::new(payer.pubkey(), false)]; let ix = Instruction::new_with_bincode( crate::id(), &MagicBlockInstruction::ScheduleTask(args), diff --git a/programs/magicblock/src/schedule_task/utils.rs b/programs/magicblock/src/schedule_task/utils.rs deleted file mode 100644 index fd01c557b..000000000 --- a/programs/magicblock/src/schedule_task/utils.rs +++ /dev/null @@ -1,26 +0,0 @@ -use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY; -use solana_log_collector::ic_msg; -use solana_program_runtime::invoke_context::InvokeContext; -use solana_sdk::instruction::InstructionError; - -use crate::utils::accounts::get_instruction_pubkey_with_idx; - -pub(crate) fn check_task_context_id( - invoke_context: &InvokeContext, - idx: u16, -) -> Result<(), InstructionError> { - let provided_magic_context = get_instruction_pubkey_with_idx( - invoke_context.transaction_context, - idx, - )?; - if !provided_magic_context.eq(&TASK_CONTEXT_PUBKEY) { - ic_msg!( - invoke_context, - "ERR: invalid task context account {}", - provided_magic_context - ); - return Err(InstructionError::MissingAccount); - } - - Ok(()) -} diff --git a/programs/magicblock/src/task_context.rs b/programs/magicblock/src/task_context.rs deleted file mode 100644 index fec6a5448..000000000 --- a/programs/magicblock/src/task_context.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::cell::RefCell; - -use magicblock_magic_program_api::TASK_CONTEXT_SIZE; -use serde::{Deserialize, Serialize}; -use solana_sdk::{ - account::{AccountSharedData, ReadableAccount}, - instruction::{Instruction, InstructionError}, - pubkey::Pubkey, -}; - -pub const MIN_EXECUTION_INTERVAL: u64 = 10; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum TaskRequest { - Schedule(ScheduleTaskRequest), - Cancel(CancelTaskRequest), -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct ScheduleTaskRequest { - /// Unique identifier for this task - pub id: u64, - /// Unsigned instructions to execute when triggered - pub instructions: Vec, - /// Authority that can modify or cancel this task - pub authority: Pubkey, - /// How frequently the task should be executed, in milliseconds - pub execution_interval_millis: u64, - /// Number of times this task will be executed - pub iterations: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct CancelTaskRequest { - /// Unique identifier for the task to cancel - pub task_id: u64, - /// Authority that can cancel this task - pub authority: Pubkey, -} - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct TaskContext { - /// List of requests - pub requests: Vec, -} - -impl TaskContext { - pub const SIZE: usize = TASK_CONTEXT_SIZE; - pub const ZERO: [u8; Self::SIZE] = [0; Self::SIZE]; - - pub fn add_request( - context_acc: &RefCell, - request: TaskRequest, - ) -> Result<(), InstructionError> { - Self::update_context(context_acc, |context| { - context.requests.push(request) - }) - } - - pub fn clear_requests( - context_acc: &RefCell, - ) -> Result<(), InstructionError> { - Self::update_context(context_acc, |context| context.requests.clear()) - } - - fn update_context( - context_acc: &RefCell, - update_fn: impl FnOnce(&mut TaskContext), - ) -> Result<(), InstructionError> { - let mut context = Self::deserialize(&context_acc.borrow()) - .map_err(|_| InstructionError::GenericError)?; - update_fn(&mut context); - - let serialized_data = bincode::serialize(&context) - .map_err(|_| InstructionError::InvalidAccountData)?; - let mut context_data = context_acc.borrow_mut(); - context_data.resize(serialized_data.len(), 0); - context_data.set_data_from_slice(&serialized_data); - Ok(()) - } - - pub(crate) fn deserialize( - data: &AccountSharedData, - ) -> Result { - if data.data().is_empty() { - Ok(Self::default()) - } else { - data.deserialize_data() - } - } -} - -// Keep the old Task struct for backward compatibility and database storage -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct CrankTask { - /// Unique identifier for this task - pub id: u64, - /// Unsigned instructions to execute when triggered - pub instructions: Vec, - /// Authority that can modify or cancel this task - pub authority: Pubkey, - /// How frequently the task should be executed, in milliseconds - pub execution_interval_millis: u64, - /// Number of times this task will be executed - pub iterations: u64, -} - -impl CrankTask { - pub fn new( - id: u64, - instructions: Vec, - authority: Pubkey, - execution_interval_millis: u64, - iterations: u64, - ) -> Self { - Self { - id, - instructions, - authority, - execution_interval_millis, - iterations, - } - } -} - -impl From<&ScheduleTaskRequest> for CrankTask { - fn from(request: &ScheduleTaskRequest) -> Self { - Self { - id: request.id, - instructions: request.instructions.clone(), - authority: request.authority, - execution_interval_millis: request.execution_interval_millis, - iterations: request.iterations, - } - } -} diff --git a/programs/magicblock/src/utils/instruction_utils.rs b/programs/magicblock/src/utils/instruction_utils.rs index 2fc4a2275..95c00fa89 100644 --- a/programs/magicblock/src/utils/instruction_utils.rs +++ b/programs/magicblock/src/utils/instruction_utils.rs @@ -213,12 +213,7 @@ impl InstructionUtils { args: ScheduleTaskArgs, accounts: &[Pubkey], ) -> Instruction { - use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY; - - let mut account_metas = vec![ - AccountMeta::new(*payer, true), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; + let mut account_metas = vec![AccountMeta::new(*payer, true)]; for account in accounts { account_metas.push(AccountMeta::new_readonly(*account, true)); } @@ -246,12 +241,7 @@ impl InstructionUtils { authority: &Pubkey, task_id: u64, ) -> Instruction { - use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY; - - let account_metas = vec![ - AccountMeta::new(*authority, true), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; + let account_metas = vec![AccountMeta::new(*authority, true)]; Instruction::new_with_bincode( crate::id(), @@ -260,32 +250,6 @@ impl InstructionUtils { ) } - // ----------------- - // Process Tasks - // ----------------- - pub fn process_tasks( - authority: &Keypair, - recent_blockhash: Hash, - ) -> Transaction { - let ix = Self::process_tasks_instruction(&authority.pubkey()); - Self::into_transaction(authority, ix, recent_blockhash) - } - - pub fn process_tasks_instruction(authority: &Pubkey) -> Instruction { - use magicblock_magic_program_api::TASK_CONTEXT_PUBKEY; - - let account_metas = vec![ - AccountMeta::new(*authority, true), - AccountMeta::new(TASK_CONTEXT_PUBKEY, false), - ]; - - Instruction::new_with_bincode( - crate::id(), - &MagicBlockInstruction::ProcessTasks, - account_metas, - ) - } - // ----------------- // Executable Check // ----------------- diff --git a/programs/magicblock/src/validator.rs b/programs/magicblock/src/validator.rs index 8a750c95d..2af460fdb 100644 --- a/programs/magicblock/src/validator.rs +++ b/programs/magicblock/src/validator.rs @@ -51,6 +51,16 @@ pub fn init_validator_authority(keypair: Keypair) { validator_authority_lock.replace(keypair); } +pub fn init_validator_authority_if_needed(keypair: Keypair) { + let mut validator_authority_lock = VALIDATOR_AUTHORITY + .write() + .expect("RwLock VALIDATOR_AUTHORITY poisoned"); + if validator_authority_lock.as_ref().is_some() { + return; + } + validator_authority_lock.replace(keypair); +} + pub fn generate_validator_authority_if_needed() { let mut validator_authority_lock = VALIDATOR_AUTHORITY .write() diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 00b1f92e4..68a746ac7 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -2271,6 +2271,7 @@ name = "guinea" version = "0.2.3" dependencies = [ "bincode", + "magicblock-magic-program-api 0.2.3", "serde", "solana-program", ] @@ -3836,6 +3837,7 @@ dependencies = [ "magicblock-accounts-db", "magicblock-core", "magicblock-ledger", + "magicblock-magic-program-api 0.2.3", "magicblock-metrics", "magicblock-program", "parking_lot 0.12.4", diff --git a/test-integration/configs/schedule-task.ephem.toml b/test-integration/configs/schedule-task.ephem.toml index bf7e143f9..3f316cb32 100644 --- a/test-integration/configs/schedule-task.ephem.toml +++ b/test-integration/configs/schedule-task.ephem.toml @@ -17,5 +17,4 @@ port = 8899 port = 10001 [task-scheduler] -millis-per-tick = 50 reset = true diff --git a/test-integration/programs/flexi-counter/src/instruction.rs b/test-integration/programs/flexi-counter/src/instruction.rs index 1389f7641..2388a417d 100644 --- a/test-integration/programs/flexi-counter/src/instruction.rs +++ b/test-integration/programs/flexi-counter/src/instruction.rs @@ -417,7 +417,6 @@ pub fn create_intent_ix( #[allow(clippy::too_many_arguments)] pub fn create_schedule_task_ix( payer: Pubkey, - task_context: Pubkey, magic_program: Pubkey, task_id: u64, execution_interval_millis: u64, @@ -430,7 +429,6 @@ pub fn create_schedule_task_ix( let accounts = vec![ AccountMeta::new_readonly(magic_program, false), AccountMeta::new(payer, true), - AccountMeta::new(task_context, false), AccountMeta::new(pda, false), ]; Instruction::new_with_borsh( @@ -448,7 +446,6 @@ pub fn create_schedule_task_ix( pub fn create_cancel_task_ix( payer: Pubkey, - task_context: Pubkey, magic_program: Pubkey, task_id: u64, ) -> Instruction { @@ -456,7 +453,6 @@ pub fn create_cancel_task_ix( let accounts = vec![ AccountMeta::new_readonly(magic_program, false), AccountMeta::new(payer, true), - AccountMeta::new(task_context, false), ]; Instruction::new_with_borsh( *program_id, diff --git a/test-integration/programs/flexi-counter/src/processor.rs b/test-integration/programs/flexi-counter/src/processor.rs index a81021d9a..8bf826635 100644 --- a/test-integration/programs/flexi-counter/src/processor.rs +++ b/test-integration/programs/flexi-counter/src/processor.rs @@ -407,7 +407,6 @@ fn process_schedule_task( let account_info_iter = &mut accounts.iter(); let _magic_program_info = next_account_info(account_info_iter)?; let payer_info = next_account_info(account_info_iter)?; - let task_context_info = next_account_info(account_info_iter)?; let counter_pda_info = next_account_info(account_info_iter)?; let (counter_pda, bump) = FlexiCounter::pda(payer_info.key); @@ -443,18 +442,13 @@ fn process_schedule_task( &ix_data, vec![ AccountMeta::new(*payer_info.key, true), - AccountMeta::new(*task_context_info.key, false), AccountMeta::new(*counter_pda_info.key, true), ], ); invoke_signed( &ix, - &[ - payer_info.clone(), - task_context_info.clone(), - counter_pda_info.clone(), - ], + &[payer_info.clone(), counter_pda_info.clone()], &[&seeds], )?; @@ -470,7 +464,6 @@ fn process_cancel_task( let account_info_iter = &mut accounts.iter(); let _magic_program_info = next_account_info(account_info_iter)?; let payer_info = next_account_info(account_info_iter)?; - let task_context_info = next_account_info(account_info_iter)?; let ix_data = bincode::serialize(&MagicBlockInstruction::CancelTask { task_id: args.task_id, @@ -483,13 +476,10 @@ fn process_cancel_task( let ix = Instruction::new_with_bytes( MAGIC_PROGRAM_ID, &ix_data, - vec![ - AccountMeta::new(*payer_info.key, true), - AccountMeta::new(*task_context_info.key, false), - ], + vec![AccountMeta::new(*payer_info.key, true)], ); - invoke(&ix, &[payer_info.clone(), task_context_info.clone()])?; + invoke(&ix, &[payer_info.clone()])?; Ok(()) } diff --git a/test-integration/programs/schedulecommit/Cargo.toml b/test-integration/programs/schedulecommit/Cargo.toml index d850f923f..59c45c378 100644 --- a/test-integration/programs/schedulecommit/Cargo.toml +++ b/test-integration/programs/schedulecommit/Cargo.toml @@ -16,3 +16,5 @@ crate-type = ["cdylib", "lib"] no-entrypoint = [] cpi = ["no-entrypoint"] default = [] +custom-heap = [] +custom-panic = [] diff --git a/test-integration/test-ledger-restore/src/lib.rs b/test-integration/test-ledger-restore/src/lib.rs index c60734075..64acf8e38 100644 --- a/test-integration/test-ledger-restore/src/lib.rs +++ b/test-integration/test-ledger-restore/src/lib.rs @@ -152,10 +152,7 @@ pub fn setup_validator_with_local_remote_and_resume_strategy( }, accounts: accounts_config.clone(), programs, - task_scheduler: TaskSchedulerConfig { - reset: true, - ..Default::default() - }, + task_scheduler: TaskSchedulerConfig { reset: true }, ..Default::default() }; // Fund validator on chain diff --git a/test-integration/test-task-scheduler/src/lib.rs b/test-integration/test-task-scheduler/src/lib.rs index 36db600ea..a7b82983c 100644 --- a/test-integration/test-task-scheduler/src/lib.rs +++ b/test-integration/test-task-scheduler/src/lib.rs @@ -43,10 +43,7 @@ pub fn setup_validator() -> (TempDir, Child, IntegrationTestContext) { let config = EphemeralConfig { accounts: accounts_config, - task_scheduler: TaskSchedulerConfig { - reset: true, - millis_per_tick: TASK_SCHEDULER_TICK_MILLIS, - }, + task_scheduler: TaskSchedulerConfig { reset: true }, validator: ValidatorConfig { millis_per_slot: TASK_SCHEDULER_TICK_MILLIS, ..Default::default() diff --git a/test-integration/test-task-scheduler/tests/test_cancel_ongoing_task.rs b/test-integration/test-task-scheduler/tests/test_cancel_ongoing_task.rs index 9e389757e..7529d70cd 100644 --- a/test-integration/test-task-scheduler/tests/test_cancel_ongoing_task.rs +++ b/test-integration/test-task-scheduler/tests/test_cancel_ongoing_task.rs @@ -1,6 +1,6 @@ use cleanass::{assert, assert_eq}; use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use magicblock_task_scheduler::SchedulerDatabase; use program_flexi_counter::{ instruction::{create_cancel_task_ix, create_schedule_task_ix}, @@ -41,7 +41,6 @@ fn test_cancel_ongoing_task() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, @@ -80,7 +79,6 @@ fn test_cancel_ongoing_task() { &mut Transaction::new_signed_with_payer( &[create_cancel_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, )], diff --git a/test-integration/test-task-scheduler/tests/test_reschedule_task.rs b/test-integration/test-task-scheduler/tests/test_reschedule_task.rs index 83bbc13d5..4d4c3b3cc 100644 --- a/test-integration/test-task-scheduler/tests/test_reschedule_task.rs +++ b/test-integration/test-task-scheduler/tests/test_reschedule_task.rs @@ -1,6 +1,6 @@ use cleanass::{assert, assert_eq}; use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use magicblock_task_scheduler::{db::DbTask, SchedulerDatabase}; use program_flexi_counter::{ instruction::{create_cancel_task_ix, create_schedule_task_ix}, @@ -41,7 +41,6 @@ fn test_reschedule_task() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, @@ -77,7 +76,6 @@ fn test_reschedule_task() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, new_execution_interval_millis, @@ -169,7 +167,6 @@ fn test_reschedule_task() { &mut Transaction::new_signed_with_payer( &[create_cancel_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, )], diff --git a/test-integration/test-task-scheduler/tests/test_schedule_error.rs b/test-integration/test-task-scheduler/tests/test_schedule_error.rs index af629963e..06a6495fb 100644 --- a/test-integration/test-task-scheduler/tests/test_schedule_error.rs +++ b/test-integration/test-task-scheduler/tests/test_schedule_error.rs @@ -1,6 +1,6 @@ use cleanass::{assert, assert_eq}; use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use magicblock_task_scheduler::SchedulerDatabase; use program_flexi_counter::{ instruction::{create_cancel_task_ix, create_schedule_task_ix}, @@ -42,7 +42,6 @@ fn test_schedule_error() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, @@ -128,7 +127,6 @@ fn test_schedule_error() { &mut Transaction::new_signed_with_payer( &[create_cancel_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, )], diff --git a/test-integration/test-task-scheduler/tests/test_schedule_task.rs b/test-integration/test-task-scheduler/tests/test_schedule_task.rs index 382978c00..c6bfd1de8 100644 --- a/test-integration/test-task-scheduler/tests/test_schedule_task.rs +++ b/test-integration/test-task-scheduler/tests/test_schedule_task.rs @@ -1,6 +1,6 @@ use cleanass::{assert, assert_eq}; use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use magicblock_task_scheduler::{db::DbTask, SchedulerDatabase}; use program_flexi_counter::{ instruction::{create_cancel_task_ix, create_schedule_task_ix}, @@ -41,7 +41,6 @@ fn test_schedule_task() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, @@ -133,7 +132,6 @@ fn test_schedule_task() { &mut Transaction::new_signed_with_payer( &[create_cancel_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, )], diff --git a/test-integration/test-task-scheduler/tests/test_schedule_task_signed.rs b/test-integration/test-task-scheduler/tests/test_schedule_task_signed.rs index e85463ccf..69d190129 100644 --- a/test-integration/test-task-scheduler/tests/test_schedule_task_signed.rs +++ b/test-integration/test-task-scheduler/tests/test_schedule_task_signed.rs @@ -1,5 +1,5 @@ use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use program_flexi_counter::instruction::create_schedule_task_ix; use solana_sdk::{ instruction::InstructionError, @@ -37,7 +37,6 @@ fn test_schedule_task_signed() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, diff --git a/test-integration/test-task-scheduler/tests/test_unauthorized_reschedule.rs b/test-integration/test-task-scheduler/tests/test_unauthorized_reschedule.rs index ccbb31c16..add2f4c99 100644 --- a/test-integration/test-task-scheduler/tests/test_unauthorized_reschedule.rs +++ b/test-integration/test-task-scheduler/tests/test_unauthorized_reschedule.rs @@ -1,6 +1,6 @@ use cleanass::{assert, assert_eq}; use integration_test_tools::{expect, validator::cleanup}; -use magicblock_program::{ID as MAGIC_PROGRAM_ID, TASK_CONTEXT_PUBKEY}; +use magicblock_program::ID as MAGIC_PROGRAM_ID; use magicblock_task_scheduler::{db::DbTask, SchedulerDatabase}; use program_flexi_counter::{ instruction::create_schedule_task_ix, state::FlexiCounter, @@ -46,7 +46,6 @@ fn test_unauthorized_reschedule() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, execution_interval_millis, @@ -82,7 +81,6 @@ fn test_unauthorized_reschedule() { &mut Transaction::new_signed_with_payer( &[create_schedule_task_ix( different_payer.pubkey(), - TASK_CONTEXT_PUBKEY, MAGIC_PROGRAM_ID, task_id, new_execution_interval_millis, diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index a69b204d8..7290f82f3 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -37,6 +37,9 @@ use solana_transaction::Transaction; use solana_transaction_status_client_types::TransactionStatusMeta; use tempfile::TempDir; +const NOOP_PROGRAM_ID: Pubkey = + Pubkey::from_str_const("noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV"); + /// A simulated validator backend for integration tests. /// /// This struct encapsulates all the core components of a validator, including @@ -121,6 +124,7 @@ impl ExecutionTestEnv { account_update_tx: validator_channels.account_update, transaction_status_tx: validator_channels.transaction_status, txn_to_process_rx: validator_channels.transaction_to_process, + tasks_tx: validator_channels.tasks_service, environment, }; @@ -131,6 +135,12 @@ impl ExecutionTestEnv { "../programs/elfs/guinea.so".into(), )]) .expect("failed to load test programs into test env"); + scheduler_state + .load_upgradeable_programs(&[( + NOOP_PROGRAM_ID, + "../test-integration/programs/noop/noop.so".into(), + )]) + .expect("failed to load test programs into test env"); // Start the transaction processing backend. TransactionScheduler::new(1, scheduler_state).spawn();