diff --git a/Cargo.lock b/Cargo.lock index 5ff4a459f..1700adacd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3861,6 +3861,7 @@ dependencies = [ "magicblock-accounts-db", "magicblock-core", "magicblock-ledger", + "magicblock-magic-program-api", "magicblock-metrics", "magicblock-program", "parking_lot 0.12.4", diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 749489458..0cc2811a6 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -284,6 +284,7 @@ impl MagicValidator { txn_to_process_rx: validator_channels.transaction_to_process, account_update_tx: validator_channels.account_update, environment: build_svm_env(&accountsdb, latest_block.blockhash, 0), + tasks_tx: validator_channels.tasks_service, }; txn_scheduler_state .load_upgradeable_programs(&programs_to_load(&config.programs)) diff --git a/magicblock-core/src/link.rs b/magicblock-core/src/link.rs index 83be2f3f1..c429b6505 100644 --- a/magicblock-core/src/link.rs +++ b/magicblock-core/src/link.rs @@ -2,8 +2,8 @@ use accounts::{AccountUpdateRx, AccountUpdateTx}; use blocks::{BlockUpdateRx, BlockUpdateTx}; use tokio::sync::mpsc; use transactions::{ - TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx, - TransactionToProcessRx, + ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle, + TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx, }; pub mod accounts; @@ -27,6 +27,8 @@ pub struct DispatchEndpoints { pub account_update: AccountUpdateRx, /// Receives notifications when a new block is produced. pub block_update: BlockUpdateRx, + /// Receives scheduled (crank) tasks from transactions executor. + pub tasks_service: ScheduledTasksRx, } /// A collection of channel endpoints for the **validator's internal core**. @@ -43,6 +45,8 @@ pub struct ValidatorChannelEndpoints { pub account_update: AccountUpdateTx, /// Sends notifications when a new block is produced to the pool of EventProcessor workers. pub block_update: BlockUpdateTx, + /// Sends scheduled (crank) tasks to tasks service from transactions executor. + pub tasks_service: ScheduledTasksTx, } /// Creates and connects the full set of communication channels between the dispatch @@ -58,6 +62,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { let (transaction_status_tx, transaction_status_rx) = flume::unbounded(); let (account_update_tx, account_update_rx) = flume::unbounded(); let (block_update_tx, block_update_rx) = flume::unbounded(); + let (tasks_tx, tasks_rx) = mpsc::unbounded_channel(); // Bounded channels for command queues where applying backpressure is important. let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY); @@ -68,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_status: transaction_status_rx, account_update: account_update_rx, block_update: block_update_rx, + tasks_service: tasks_rx, }; // Bundle the corresponding channel ends for the validator's internal core. @@ -76,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_status: transaction_status_tx, account_update: account_update_tx, block_update: block_update_tx, + tasks_service: tasks_tx, }; (dispatch, validator) diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 779c871a7..ed4a3d271 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -1,4 +1,5 @@ use flume::{Receiver as MpmcReceiver, Sender as MpmcSender}; +use magicblock_magic_program_api::args::TaskRequest; use solana_program::message::{ inner_instruction::InnerInstructionsList, SimpleAddressLoader, }; @@ -11,7 +12,7 @@ use solana_transaction::{ use solana_transaction_context::TransactionReturnData; use solana_transaction_error::TransactionError; use tokio::sync::{ - mpsc::{Receiver, Sender}, + mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, oneshot, }; @@ -30,6 +31,11 @@ pub type TransactionToProcessRx = Receiver; /// The sender end of the channel used to send new transactions to the scheduler for processing. type TransactionToProcessTx = Sender; +/// The receiver end of the channel used to send scheduled tasks (cranking) +pub type ScheduledTasksRx = UnboundedReceiver; +/// The sender end of the channel used to send scheduled tasks (cranking) +pub type ScheduledTasksTx = UnboundedSender; + /// A cloneable handle that provides a high-level API for /// submitting transactions to the processing pipeline. /// diff --git a/magicblock-magic-program-api/src/args.rs b/magicblock-magic-program-api/src/args.rs index 322d50675..cb339e153 100644 --- a/magicblock-magic-program-api/src/args.rs +++ b/magicblock-magic-program-api/src/args.rs @@ -109,3 +109,31 @@ pub struct ScheduleTaskArgs { pub iterations: u64, pub instructions: Vec, } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum TaskRequest { + Schedule(ScheduleTaskRequest), + Cancel(CancelTaskRequest), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ScheduleTaskRequest { + /// Unique identifier for this task + pub id: u64, + /// Unsigned instructions to execute when triggered + pub instructions: Vec, + /// Authority that can modify or cancel this task + pub authority: Pubkey, + /// How frequently the task should be executed, in milliseconds + pub execution_interval_millis: u64, + /// Number of times this task will be executed + pub iterations: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CancelTaskRequest { + /// Unique identifier for the task to cancel + pub task_id: u64, + /// Authority that can cancel this task + pub authority: Pubkey, +} diff --git a/magicblock-magic-program-api/src/lib.rs b/magicblock-magic-program-api/src/lib.rs index b22b7e975..9aa584220 100644 --- a/magicblock-magic-program-api/src/lib.rs +++ b/magicblock-magic-program-api/src/lib.rs @@ -1,5 +1,6 @@ pub mod args; pub mod instruction; +pub mod tls; pub use solana_program::{declare_id, pubkey, pubkey::Pubkey}; diff --git a/magicblock-magic-program-api/src/tls.rs b/magicblock-magic-program-api/src/tls.rs new file mode 100644 index 000000000..8f2c6ba70 --- /dev/null +++ b/magicblock-magic-program-api/src/tls.rs @@ -0,0 +1,21 @@ +use std::{cell::RefCell, collections::VecDeque}; + +use crate::args::TaskRequest; + +#[derive(Default, Debug)] +pub struct ExecutionTlsStash { + pub tasks: VecDeque, + // TODO(bmuddha/taco-paco): intents should go in here + pub intents: VecDeque<()>, +} + +thread_local! { + pub static EXECUTION_TLS_STASH: RefCell = RefCell::default(); +} + +impl ExecutionTlsStash { + pub fn clear(&mut self) { + self.tasks.clear(); + self.intents.clear(); + } +} diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 8aa007057..3577ed1f4 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -18,6 +18,7 @@ magicblock-core = { workspace = true } magicblock-ledger = { workspace = true } magicblock-metrics = { workspace = true } magicblock-program = { workspace = true } +magicblock-magic-program-api = { workspace = true } solana-account = { workspace = true } solana-bpf-loader-program = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 08eb55c90..3aa47ef1b 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -5,7 +5,8 @@ use magicblock_accounts_db::{AccountsDb, StWLock}; use magicblock_core::link::{ accounts::AccountUpdateTx, transactions::{ - TransactionProcessingMode, TransactionStatusTx, TransactionToProcessRx, + ScheduledTasksTx, TransactionProcessingMode, TransactionStatusTx, + TransactionToProcessRx, }, }; use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger}; @@ -49,6 +50,8 @@ pub(super) struct TransactionExecutor { transaction_tx: TransactionStatusTx, /// A channel to send out account state updates after processing. accounts_tx: AccountUpdateTx, + /// A channel to send scheduled (crank) tasks created by transactions. + tasks_tx: ScheduledTasksTx, /// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work. ready_tx: Sender, /// A read lock held during a slot's processing to synchronize with critical global @@ -99,6 +102,7 @@ impl TransactionExecutor { ready_tx, accounts_tx: state.account_update_tx.clone(), transaction_tx: state.transaction_status_tx.clone(), + tasks_tx: state.tasks_tx.clone(), }; this.processor.fill_missing_sysvar_cache_entries(&this); diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 6bbd88098..ef4b78a37 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -1,4 +1,4 @@ -use log::error; +use log::{error, warn}; use magicblock_core::link::{ accounts::{AccountWithSlot, LockedAccount}, transactions::{ @@ -7,6 +7,7 @@ use magicblock_core::link::{ }, }; use magicblock_metrics::metrics::FAILED_TRANSACTIONS_COUNT; +use magicblock_program::tls::EXECUTION_TLS_STASH; use solana_pubkey::Pubkey; use solana_svm::{ account_loader::{AccountsBalances, CheckedTransactionDetails}, @@ -76,13 +77,28 @@ impl super::TransactionExecutor { // Otherwise commit the account state changes self.commit_accounts(feepayer, &processed, is_replay); - // And commit transaction to the ledger + // Commit transaction to the ledger and schedule tasks and intents (if any) if !is_replay { self.commit_transaction(txn, processed, balances); + // If the transaction succeeded, check for potential tasks/intents + // that may have been scheduled during the transaction execution + if result.is_ok() { + EXECUTION_TLS_STASH.with(|stash| { + for task in stash.borrow_mut().tasks.drain(..) { + // This is a best effort send, if the tasks service has terminated + // for some reason, logging is the best we can do at this point + let _ = self.tasks_tx.send(task).inspect_err(|_| + warn!("Scheduled tasks service has hung up and longer running") + ); + } + }); + } } result }); + // Make sure that no matter what happened to the transaction we clear the stash + EXECUTION_TLS_STASH.with(|s| s.borrow_mut().clear()); // Send the final result back to the caller if they are waiting. tx.map(|tx| tx.send(result)); @@ -128,6 +144,9 @@ impl super::TransactionExecutor { inner_instructions: None, }, }; + // Make sure that we clear the stash, so that simulations + // don't interfere with actual transaction executions + EXECUTION_TLS_STASH.with(|s| s.borrow_mut().clear()); let _ = tx.send(result); } diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 531ac1ca5..bed813087 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -3,7 +3,9 @@ use std::sync::{Arc, OnceLock, RwLock}; use magicblock_accounts_db::AccountsDb; use magicblock_core::link::{ accounts::AccountUpdateTx, - transactions::{TransactionStatusTx, TransactionToProcessRx}, + transactions::{ + ScheduledTasksTx, TransactionStatusTx, TransactionToProcessRx, + }, }; use magicblock_ledger::Ledger; use solana_account::AccountSharedData; @@ -40,6 +42,8 @@ pub struct TransactionSchedulerState { pub account_update_tx: AccountUpdateTx, /// The channel for sending final transaction statuses to downstream consumers. pub transaction_status_tx: TransactionStatusTx, + /// A channel to send scheduled (crank) tasks created by transactions. + pub tasks_tx: ScheduledTasksTx, } impl TransactionSchedulerState { diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index f479e4409..76fb3a9c1 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -15,10 +15,10 @@ use magicblock_core::{ }; use magicblock_ledger::LatestBlock; use magicblock_program::{ + args::{CancelTaskRequest, ScheduleTaskRequest, TaskRequest}, instruction_utils::InstructionUtils, validator::{validator_authority, validator_authority_id}, - CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext, - TaskRequest, TASK_CONTEXT_PUBKEY, + CrankTask, TaskContext, TASK_CONTEXT_PUBKEY, }; use solana_sdk::{ account::ReadableAccount, instruction::Instruction, message::Message, diff --git a/programs/magicblock/src/lib.rs b/programs/magicblock/src/lib.rs index 2b40ffde8..bedcc414e 100644 --- a/programs/magicblock/src/lib.rs +++ b/programs/magicblock/src/lib.rs @@ -7,9 +7,7 @@ mod toggle_executable_check; pub use magic_context::MagicContext; pub mod magic_scheduled_base_intent; pub mod task_context; -pub use task_context::{ - CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext, TaskRequest, -}; +pub use task_context::{CrankTask, TaskContext}; pub mod magicblock_processor; pub mod test_utils; mod utils; diff --git a/programs/magicblock/src/schedule_task/process_cancel_task.rs b/programs/magicblock/src/schedule_task/process_cancel_task.rs index 3da2df793..0a3a5cc34 100644 --- a/programs/magicblock/src/schedule_task/process_cancel_task.rs +++ b/programs/magicblock/src/schedule_task/process_cancel_task.rs @@ -1,16 +1,16 @@ use std::collections::HashSet; +use magicblock_magic_program_api::args::{CancelTaskRequest, TaskRequest}; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; use solana_sdk::{instruction::InstructionError, pubkey::Pubkey}; use crate::{ schedule_task::utils::check_task_context_id, - task_context::{CancelTaskRequest, TaskContext}, + task_context::TaskContext, utils::accounts::{ get_instruction_account_with_idx, get_instruction_pubkey_with_idx, }, - TaskRequest, }; pub(crate) fn process_cancel_task( diff --git a/programs/magicblock/src/schedule_task/process_schedule_task.rs b/programs/magicblock/src/schedule_task/process_schedule_task.rs index a40935b20..febfc251b 100644 --- a/programs/magicblock/src/schedule_task/process_schedule_task.rs +++ b/programs/magicblock/src/schedule_task/process_schedule_task.rs @@ -1,18 +1,19 @@ use std::collections::HashSet; -use magicblock_magic_program_api::args::ScheduleTaskArgs; +use magicblock_magic_program_api::args::{ + ScheduleTaskArgs, ScheduleTaskRequest, TaskRequest, +}; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; use solana_sdk::{instruction::InstructionError, pubkey::Pubkey}; use crate::{ schedule_task::utils::check_task_context_id, - task_context::{ScheduleTaskRequest, TaskContext, MIN_EXECUTION_INTERVAL}, + task_context::{TaskContext, MIN_EXECUTION_INTERVAL}, utils::accounts::{ get_instruction_account_with_idx, get_instruction_pubkey_with_idx, }, validator::validator_authority_id, - TaskRequest, }; pub(crate) fn process_schedule_task( diff --git a/programs/magicblock/src/task_context.rs b/programs/magicblock/src/task_context.rs index fec6a5448..6aa4b0b01 100644 --- a/programs/magicblock/src/task_context.rs +++ b/programs/magicblock/src/task_context.rs @@ -1,6 +1,9 @@ use std::cell::RefCell; -use magicblock_magic_program_api::TASK_CONTEXT_SIZE; +use magicblock_magic_program_api::{ + args::{ScheduleTaskRequest, TaskRequest}, + TASK_CONTEXT_SIZE, +}; use serde::{Deserialize, Serialize}; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, @@ -10,34 +13,6 @@ use solana_sdk::{ pub const MIN_EXECUTION_INTERVAL: u64 = 10; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum TaskRequest { - Schedule(ScheduleTaskRequest), - Cancel(CancelTaskRequest), -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct ScheduleTaskRequest { - /// Unique identifier for this task - pub id: u64, - /// Unsigned instructions to execute when triggered - pub instructions: Vec, - /// Authority that can modify or cancel this task - pub authority: Pubkey, - /// How frequently the task should be executed, in milliseconds - pub execution_interval_millis: u64, - /// Number of times this task will be executed - pub iterations: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct CancelTaskRequest { - /// Unique identifier for the task to cancel - pub task_id: u64, - /// Authority that can cancel this task - pub authority: Pubkey, -} - #[derive(Debug, Default, Serialize, Deserialize)] pub struct TaskContext { /// List of requests diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index a69b204d8..555542ae9 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -121,6 +121,7 @@ impl ExecutionTestEnv { account_update_tx: validator_channels.account_update, transaction_status_tx: validator_channels.transaction_status, txn_to_process_rx: validator_channels.transaction_to_process, + tasks_tx: validator_channels.tasks_service, environment, };