Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl MagicValidator {
txn_to_process_rx: validator_channels.transaction_to_process,
account_update_tx: validator_channels.account_update,
environment: build_svm_env(&accountsdb, latest_block.blockhash, 0),
tasks_tx: validator_channels.tasks_service,
};
txn_scheduler_state
.load_upgradeable_programs(&programs_to_load(&config.programs))
Expand Down
11 changes: 9 additions & 2 deletions magicblock-core/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use accounts::{AccountUpdateRx, AccountUpdateTx};
use blocks::{BlockUpdateRx, BlockUpdateTx};
use tokio::sync::mpsc;
use transactions::{
TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx,
TransactionToProcessRx,
ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle,
TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx,
};

pub mod accounts;
Expand All @@ -27,6 +27,8 @@ pub struct DispatchEndpoints {
pub account_update: AccountUpdateRx,
/// Receives notifications when a new block is produced.
pub block_update: BlockUpdateRx,
/// Receives scheduled (crank) tasks from transactions executor.
pub tasks_service: ScheduledTasksRx,
}

/// A collection of channel endpoints for the **validator's internal core**.
Expand All @@ -43,6 +45,8 @@ pub struct ValidatorChannelEndpoints {
pub account_update: AccountUpdateTx,
/// Sends notifications when a new block is produced to the pool of EventProcessor workers.
pub block_update: BlockUpdateTx,
/// Sends scheduled (crank) tasks to tasks service from transactions executor.
pub tasks_service: ScheduledTasksTx,
}

/// Creates and connects the full set of communication channels between the dispatch
Expand All @@ -58,6 +62,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
let (transaction_status_tx, transaction_status_rx) = flume::unbounded();
let (account_update_tx, account_update_rx) = flume::unbounded();
let (block_update_tx, block_update_rx) = flume::unbounded();
let (tasks_tx, tasks_rx) = mpsc::unbounded_channel();

// Bounded channels for command queues where applying backpressure is important.
let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY);
Expand All @@ -68,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
transaction_status: transaction_status_rx,
account_update: account_update_rx,
block_update: block_update_rx,
tasks_service: tasks_rx,
};

// Bundle the corresponding channel ends for the validator's internal core.
Expand All @@ -76,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
transaction_status: transaction_status_tx,
account_update: account_update_tx,
block_update: block_update_tx,
tasks_service: tasks_tx,
};

(dispatch, validator)
Expand Down
8 changes: 7 additions & 1 deletion magicblock-core/src/link/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use flume::{Receiver as MpmcReceiver, Sender as MpmcSender};
use magicblock_magic_program_api::args::TaskRequest;
use solana_program::message::{
inner_instruction::InnerInstructionsList, SimpleAddressLoader,
};
Expand All @@ -11,7 +12,7 @@ use solana_transaction::{
use solana_transaction_context::TransactionReturnData;
use solana_transaction_error::TransactionError;
use tokio::sync::{
mpsc::{Receiver, Sender},
mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
oneshot,
};

Expand All @@ -30,6 +31,11 @@ pub type TransactionToProcessRx = Receiver<ProcessableTransaction>;
/// The sender end of the channel used to send new transactions to the scheduler for processing.
type TransactionToProcessTx = Sender<ProcessableTransaction>;

/// The receiver end of the channel used to send scheduled tasks (cranking)
pub type ScheduledTasksRx = UnboundedReceiver<TaskRequest>;
/// The sender end of the channel used to send scheduled tasks (cranking)
pub type ScheduledTasksTx = UnboundedSender<TaskRequest>;

/// A cloneable handle that provides a high-level API for
/// submitting transactions to the processing pipeline.
///
Expand Down
28 changes: 28 additions & 0 deletions magicblock-magic-program-api/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,31 @@ pub struct ScheduleTaskArgs {
pub iterations: u64,
pub instructions: Vec<Instruction>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TaskRequest {
Schedule(ScheduleTaskRequest),
Cancel(CancelTaskRequest),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ScheduleTaskRequest {
/// Unique identifier for this task
pub id: u64,
/// Unsigned instructions to execute when triggered
pub instructions: Vec<Instruction>,
/// Authority that can modify or cancel this task
pub authority: Pubkey,
/// How frequently the task should be executed, in milliseconds
pub execution_interval_millis: u64,
/// Number of times this task will be executed
pub iterations: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CancelTaskRequest {
/// Unique identifier for the task to cancel
pub task_id: u64,
/// Authority that can cancel this task
pub authority: Pubkey,
}
1 change: 1 addition & 0 deletions magicblock-magic-program-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod args;
pub mod instruction;
pub mod tls;

pub use solana_program::{declare_id, pubkey, pubkey::Pubkey};

Expand Down
21 changes: 21 additions & 0 deletions magicblock-magic-program-api/src/tls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::{cell::RefCell, collections::VecDeque};

use crate::args::TaskRequest;

#[derive(Default, Debug)]
pub struct ExecutionTlsStash {
pub tasks: VecDeque<TaskRequest>,
// TODO(bmuddha/taco-paco): intents should go in here
pub intents: VecDeque<()>,
}

thread_local! {
pub static EXECUTION_TLS_STASH: RefCell<ExecutionTlsStash> = RefCell::default();
}

impl ExecutionTlsStash {
pub fn clear(&mut self) {
self.tasks.clear();
self.intents.clear();
}
}
1 change: 1 addition & 0 deletions magicblock-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ magicblock-core = { workspace = true }
magicblock-ledger = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-program = { workspace = true }
magicblock-magic-program-api = { workspace = true }

solana-account = { workspace = true }
solana-bpf-loader-program = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion magicblock-processor/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use magicblock_accounts_db::{AccountsDb, StWLock};
use magicblock_core::link::{
accounts::AccountUpdateTx,
transactions::{
TransactionProcessingMode, TransactionStatusTx, TransactionToProcessRx,
ScheduledTasksTx, TransactionProcessingMode, TransactionStatusTx,
TransactionToProcessRx,
},
};
use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger};
Expand Down Expand Up @@ -49,6 +50,8 @@ pub(super) struct TransactionExecutor {
transaction_tx: TransactionStatusTx,
/// A channel to send out account state updates after processing.
accounts_tx: AccountUpdateTx,
/// A channel to send scheduled (crank) tasks created by transactions.
tasks_tx: ScheduledTasksTx,
/// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work.
ready_tx: Sender<WorkerId>,
/// A read lock held during a slot's processing to synchronize with critical global
Expand Down Expand Up @@ -99,6 +102,7 @@ impl TransactionExecutor {
ready_tx,
accounts_tx: state.account_update_tx.clone(),
transaction_tx: state.transaction_status_tx.clone(),
tasks_tx: state.tasks_tx.clone(),
};

this.processor.fill_missing_sysvar_cache_entries(&this);
Expand Down
23 changes: 21 additions & 2 deletions magicblock-processor/src/executor/processing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::error;
use log::{error, warn};
use magicblock_core::link::{
accounts::{AccountWithSlot, LockedAccount},
transactions::{
Expand All @@ -7,6 +7,7 @@ use magicblock_core::link::{
},
};
use magicblock_metrics::metrics::FAILED_TRANSACTIONS_COUNT;
use magicblock_program::tls::EXECUTION_TLS_STASH;
use solana_pubkey::Pubkey;
use solana_svm::{
account_loader::{AccountsBalances, CheckedTransactionDetails},
Expand Down Expand Up @@ -76,13 +77,28 @@ impl super::TransactionExecutor {
// Otherwise commit the account state changes
self.commit_accounts(feepayer, &processed, is_replay);

// And commit transaction to the ledger
// Commit transaction to the ledger and schedule tasks and intents (if any)
if !is_replay {
self.commit_transaction(txn, processed, balances);
// If the transaction succeeded, check for potential tasks/intents
// that may have been scheduled during the transaction execution
if result.is_ok() {
EXECUTION_TLS_STASH.with(|stash| {
for task in stash.borrow_mut().tasks.drain(..) {
// This is a best effort send, if the tasks service has terminated
// for some reason, logging is the best we can do at this point
let _ = self.tasks_tx.send(task).inspect_err(|_|
warn!("Scheduled tasks service has hung up and longer running")
);
}
});
}
}

result
});
// Make sure that no matter what happened to the transaction we clear the stash
EXECUTION_TLS_STASH.with(|s| s.borrow_mut().clear());

// Send the final result back to the caller if they are waiting.
tx.map(|tx| tx.send(result));
Expand Down Expand Up @@ -128,6 +144,9 @@ impl super::TransactionExecutor {
inner_instructions: None,
},
};
// Make sure that we clear the stash, so that simulations
// don't interfere with actual transaction executions
EXECUTION_TLS_STASH.with(|s| s.borrow_mut().clear());
let _ = tx.send(result);
}

Expand Down
6 changes: 5 additions & 1 deletion magicblock-processor/src/scheduler/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::{Arc, OnceLock, RwLock};
use magicblock_accounts_db::AccountsDb;
use magicblock_core::link::{
accounts::AccountUpdateTx,
transactions::{TransactionStatusTx, TransactionToProcessRx},
transactions::{
ScheduledTasksTx, TransactionStatusTx, TransactionToProcessRx,
},
};
use magicblock_ledger::Ledger;
use solana_account::AccountSharedData;
Expand Down Expand Up @@ -40,6 +42,8 @@ pub struct TransactionSchedulerState {
pub account_update_tx: AccountUpdateTx,
/// The channel for sending final transaction statuses to downstream consumers.
pub transaction_status_tx: TransactionStatusTx,
/// A channel to send scheduled (crank) tasks created by transactions.
pub tasks_tx: ScheduledTasksTx,
}

impl TransactionSchedulerState {
Expand Down
4 changes: 2 additions & 2 deletions magicblock-task-scheduler/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use magicblock_core::{
};
use magicblock_ledger::LatestBlock;
use magicblock_program::{
args::{CancelTaskRequest, ScheduleTaskRequest, TaskRequest},
instruction_utils::InstructionUtils,
validator::{validator_authority, validator_authority_id},
CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext,
TaskRequest, TASK_CONTEXT_PUBKEY,
CrankTask, TaskContext, TASK_CONTEXT_PUBKEY,
};
use solana_sdk::{
account::ReadableAccount, instruction::Instruction, message::Message,
Expand Down
4 changes: 1 addition & 3 deletions programs/magicblock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ mod toggle_executable_check;
pub use magic_context::MagicContext;
pub mod magic_scheduled_base_intent;
pub mod task_context;
pub use task_context::{
CancelTaskRequest, CrankTask, ScheduleTaskRequest, TaskContext, TaskRequest,
};
pub use task_context::{CrankTask, TaskContext};
pub mod magicblock_processor;
pub mod test_utils;
mod utils;
Expand Down
4 changes: 2 additions & 2 deletions programs/magicblock/src/schedule_task/process_cancel_task.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::collections::HashSet;

use magicblock_magic_program_api::args::{CancelTaskRequest, TaskRequest};
use solana_log_collector::ic_msg;
use solana_program_runtime::invoke_context::InvokeContext;
use solana_sdk::{instruction::InstructionError, pubkey::Pubkey};

use crate::{
schedule_task::utils::check_task_context_id,
task_context::{CancelTaskRequest, TaskContext},
task_context::TaskContext,
utils::accounts::{
get_instruction_account_with_idx, get_instruction_pubkey_with_idx,
},
TaskRequest,
};

pub(crate) fn process_cancel_task(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::collections::HashSet;

use magicblock_magic_program_api::args::ScheduleTaskArgs;
use magicblock_magic_program_api::args::{
ScheduleTaskArgs, ScheduleTaskRequest, TaskRequest,
};
use solana_log_collector::ic_msg;
use solana_program_runtime::invoke_context::InvokeContext;
use solana_sdk::{instruction::InstructionError, pubkey::Pubkey};

use crate::{
schedule_task::utils::check_task_context_id,
task_context::{ScheduleTaskRequest, TaskContext, MIN_EXECUTION_INTERVAL},
task_context::{TaskContext, MIN_EXECUTION_INTERVAL},
utils::accounts::{
get_instruction_account_with_idx, get_instruction_pubkey_with_idx,
},
validator::validator_authority_id,
TaskRequest,
};

pub(crate) fn process_schedule_task(
Expand Down
33 changes: 4 additions & 29 deletions programs/magicblock/src/task_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cell::RefCell;

use magicblock_magic_program_api::TASK_CONTEXT_SIZE;
use magicblock_magic_program_api::{
args::{ScheduleTaskRequest, TaskRequest},
TASK_CONTEXT_SIZE,
};
use serde::{Deserialize, Serialize};
use solana_sdk::{
account::{AccountSharedData, ReadableAccount},
Expand All @@ -10,34 +13,6 @@ use solana_sdk::{

pub const MIN_EXECUTION_INTERVAL: u64 = 10;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TaskRequest {
Schedule(ScheduleTaskRequest),
Cancel(CancelTaskRequest),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ScheduleTaskRequest {
/// Unique identifier for this task
pub id: u64,
/// Unsigned instructions to execute when triggered
pub instructions: Vec<Instruction>,
/// Authority that can modify or cancel this task
pub authority: Pubkey,
/// How frequently the task should be executed, in milliseconds
pub execution_interval_millis: u64,
/// Number of times this task will be executed
pub iterations: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CancelTaskRequest {
/// Unique identifier for the task to cancel
pub task_id: u64,
/// Authority that can cancel this task
pub authority: Pubkey,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct TaskContext {
/// List of requests
Expand Down
1 change: 1 addition & 0 deletions test-kit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl ExecutionTestEnv {
account_update_tx: validator_channels.account_update,
transaction_status_tx: validator_channels.transaction_status,
txn_to_process_rx: validator_channels.transaction_to_process,
tasks_tx: validator_channels.tasks_service,
environment,
};

Expand Down
Loading