From 81adeef7df63c527845ac58e6c975e95af1dbe49 Mon Sep 17 00:00:00 2001 From: ezrasisk <85811167+ezrasisk@users.noreply.github.com> Date: Mon, 10 Nov 2025 18:06:28 -0600 Subject: [PATCH 1/2] Implement episode management and rollback logic locally untestable, it *should* work though; --- kdapp/src/engine_src.rs | 285 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 kdapp/src/engine_src.rs diff --git a/kdapp/src/engine_src.rs b/kdapp/src/engine_src.rs new file mode 100644 index 00000000..b62fb52c --- /dev/null +++ b/kdapp/src/engine_src.rs @@ -0,0 +1,285 @@ +//! This module handles the logic of running and maintaining several epidodes of the same type +//! including keeping a stack of rollback objects per episode in order to support DAG reorg handling + +use borsh::{BorshDeserialize, BorshSerialize}; +use kaspa_consensus_core::Hash; +use log::*; +use secp256k1::SecretKey; + +use crate::episode::{Episode, EpisodeError, EpisodeEventHandler, EpisodeId, PayloadMetadata}; +use crate::pki::{sign_message, to_message, verify_signature, PubKey, Sig}; +use std::any::type_name; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::marker::PhantomData; +use std::sync::mpsc::Receiver; + +const EPISODE_LIFETIME: u64 = 2592000; // Three days +const SAMPLE_REMOVAL_TIME: u64 = 432000; // Half a day +const ROLLBACK_CAP: usize = 1000; // Keep only the last N rollback entries per episode + +pub(crate) struct EpisodeWrapper { + pub episode: G, + pub rollback_stack: VecDeque, +} + +#[derive(Default)] +pub struct DefaultEventHandler; + +impl EpisodeEventHandler for DefaultEventHandler { + fn on_initialize(&self, _episode_id: EpisodeId, _episode: &G) {} + + fn on_command( + &self, + _episode_id: EpisodeId, + _episode: &G, + _cmd: &::Command, + _authorization: Options, + _metadata: &PayloadMetadata, + ) {} + + fn on_rollback(&self, _episode_id: EpisodeId, _episode: &G) {} +} + +// The main entry point for running episodes of a given Episode type. +pub struct Engine = DefaultEventHandler> { + pub(crate) episodes: HashMap>, + pub(crate) revert_map: HashMap>, + pub(crate) receiver: Receiver, + pub(crate) next_filtering: u64, + pub(crate) episode_creation_times: HashMap, + + _phantom: PhantomData

, +} + +#[derive(Debug, BorshSerialize, BorshDeserialize)] +pub enum EpisodeMessage { + NewEpisode { episode_id: EpisodeId, participants: Vec }, + SignedCommand { episode_id: EpisodeId, cmd: G::Command, pubkey: PubKey, sig: Sig }, + UnsignedCommand { episode_id: EpisodeId, cmd: G::Command }, + Revert { episode_id: EpisodeId }, +} + +impl EpisodeMessage { + pub fn new_signed_command(episode_id: EpisodeId, cmd: G::Command, sk: SecretKey, pk: PubKey) -> Self { + let msg = to_message(&cmd); + let sig = sign_message(&sk, &msg); + Self::SignedCommand { episode_id, cmd, pubkey: pk, sig } + } + + pub fn episode_id(&self) -> EpisodeId { + match self { + EpisodeMessage::NewEpisode { episode_id, .. } => *episode_id, + EpisodeMessage::SignedCommand { episode_id, .. } => *episode_id, + EpisodeMessage::UnsignedCommand { episode_id, .. } => *episode_id, + EpisodeMessage::Revert { episode_id } => *episode_id, + } + } +} + +#[derive(Debug, BorshSerialize, BorshDeserialize)] +pub enum EngineMsg { + BlkAccepted { accepting_hash: Hash, accepting_daa: u64, accepting_time: u64, associated_txs: Vec<(Hash, Vec)> }, + BlkReverted { accepting_hash: Hash }, + Exit, +} + +impl EpisodeWrapper { + pub fn intialize(participants: Vec, metadata: &PayloadMetadata) -> Self { + let episode = G::initialize(participants, metadata); + let rollback_stack = VecDeque::new(); + EpisodeWrapper { episode, rollback_stack } + } + + pub fn execute_signed( + &mut self, + cmd: &G::Command, + pubkey: PubKey, + sig: Sig, + metadata: &PayloadMetadata, + ) -> Result<(), EpisodeError> { + if !self::verify_signature(&pubkey, &self::to_message(&cmd), &sig) { + return Err(EpisodeError::InvalidSignature); + } + let rollback = G::execute(&mut self.episode, cmd, Some(pubkey), metadata)?; + // Keep only the last ROLLBACK_CAP rollbacks + if self.rollback_stack.len() >= ROLLBACK_CAP { + self.rollback_stack.pop_front(); + } + self.rollback_stack.push_back(rollback); + Ok(()) + } + + pub fn execute_unsigned(&mut self, cmd: &G::Command, metadata: &PayloadMetadata) -> Result<(), EpisodeError> { + let rollback = G::execute(&mut self.episode, cmd None, metadata)?; + // keep only last ROLLBACK_CAP rollbacks + if self.rollback_stack.len() >= ROLLBACK_CAP { + self.rollback_stack.push.pop_front(); + } + self.rollback_stack.push_back(rollback); + Ok(()) + } + + pub fn rollback(&mut self) -> Result<(), EpisodeError> { + if let Some(rollback) = self.rollback_stack.pop_back() { + let res = self.episode.rollback(rollback); + if !res { + error!("Episode rollback for type {} was unsuccessful (indicates a severe bug in episode impl or engine code)", type_name::()); + } + Ok(()) + } else { + // Stack is empty, hint for episode deletion + Err(EpisodeError::DeleteEpisode) + } + } +} + +impl> Engine { + pub fn new(receiver: Receiver) -> Self { + let episodes: HashMap> = HashMap::new(); + let episode_creation_times: HashMap = HashMap::new(); + let revert_map: HashMap> = HashMap::new(); + let next_filtering: u64 = 0; + Self { episodes, revert_map, episode_creation_times, receiver, next_filtering, _phantom: Default::default() } + } + + pub fn start(&mut self, handlers: Vec) { + while let Ok(msg) = self.receiver.recv() { + match msg { + EngineMsg::BlkAccepted { accepting_hash, accepting_daa, accepting_time, associated_txs } => { + self.filter_old_episodes(accepting_daa); + let mut revert_vec: Vec<(EpisodeId, PayloadMetadata)> = vec![]; + for (tx_id, payload) in associated_txd { + let episode_action: EpisodeMessage = match borsh::from_slice(&payload) { + Ok(EpisodeMessage::Revert { episode_id }) => { + warn!("Episode: {}. Illegal revert attempted. Ignoring.", episode_id); + continue; + } + Ok(episode_action) => episode_action, + Err(err) => { + warn!("Payload: {:?} rejected. Parsing error: {}", payload, err); + continue; + } + }; + let metadata = PayloadMetadata { accepting_hash, accepting_daa, accepting_time, tx_id }; + if let Some(revert_id) = self.handle_message(episode_action, &metadata, &handlers) { + revert_vec.push(revert_id); + } + } + self.revert_map.insert(accepting_hash, revert_vec); + } + EngineMsg::BlkReverted { accepting_hash } => match self.revert_map.entry(accepting_hash) { + Entry::Occupied(entry) => { + for reversion in entry.remove().into_iter().rev() { + let episode_action: EpisodeMessage = EpisodeMessage::Revert { episode_id: reversion.0 }; + let metadata = PayloadMetadata { + accepting_hash, + accepting_daa: reversion.1.accepting_daa, + accepting_time: reversion.1.accepting_time, + tx_id: reversion.1.tx_id, + }; + assert_eq!(self.handle_message(episode_action, &metadata, &handlers), None); + } + } + Entry::Vacant(_) => {} + }, + EngineMsg::Exit => break, + } + } +} + +pub fn filter_old_episodes(&mut self, daa_score: u64) { + if daa_score > self.next_filtering + SAMPLE_REMOVAL_TIME { + let mut remove_ids = vec![]; + for (episode_id, creation_time) in self.episode_creation_times.iter() { + if creation_time < &daa_score.saturating_sub(EPISODE_LIFETIME) { + remove_ids.push(*episode_id); + } + } + for episode_id in remove_ids { + self.episodes.remove_entry(&episode_id); + self.episode_creation_times.remove_entry(&episode_id); + } + self.next_filtering = daa_score; + } +} + +pub fn handle_message( + &mut self, + episode_action: EpisodeMessage, + metadata: &PayloadMetadata, + handlers: &[H], +) -> Option<(EpisodeId, PayloadMetadata)> { + match episode_action { + EpisodeMessage::NewEpisode { episode_id, participants } => { + if self.episodes.contains_key(&episode_id) { + warn!("Episode with id {} already exists", episode_id); + return None; + } + let ew = EpisodeWrapper::::initialize(participants, metadata); + for handler in handlers.iter() { + handler.on_initialize(episode_id, &ew.episode); + } + self.episodes.insert(episode_id, ew); + debug!("Episode {} created.", episode_id); + self.episode_creation_times.insert(episode_id, metadata.accepting_daa); + + return Some((episode_id, metadata.clone())); + } + + EpisodeMessage::SignedCommand { episode_id, cmd, pubkey, sig } => { + if let Some(wrapper) = self.episodes.get_mut(&episode_id) { + match wrapper.execute_signed(&cmd, pubkey, sig, metadata) { + Ok(()) => { for handler in handlers.iter() { + handler.on_command(episode_id, &wrapper.episode, &cmd, Some(pubkey), metadata); + } + return Some((episode_id, metadata.clone())); + } + Err(e) => { + warn!("Episode {}: Command {:?} rejected: {}", episode_id, cmd, e) + } + } + } else { + warn!("Episode {} not found.", episode_id); + } + } + + EpisodeMessage::UnsignedCommand { episode_id, cmd } => { + if let Some(wrapper) = self.episodes.get_mut(&episode_id) { + match wrapper.execute_unsigned(&cmd, metadata) { + Ok(()) => { + for handler in handlers.iter() { + handler.on_command(episode_id, &wrapper.episode, &cmd, None, metadata); + } + return Some((episode_id, metadata.clone())); + } + Err(e) => { + warn!("Episode {}: Command {:?} rejected: {}", episode_id, cmd, e) + } + } + } else { + warn!("Episode {} not found.", episode_id); + } + } + + EpisodeMessage::Revert { episode_id } => { + if let Some(wrapper) = self.episodes.get_mut(&episode_id) { + info!("Episode {}: Reverting command: {:?}", episode_id metadata.tx_id); + let rollback_result = wrapper.rollback(); + for handler in handlers.iter() { + handler.on_rollback(episode_id, &wrapper.episode); + } + if let Err(EpisodeError::DeleteEpisode) = rollback_result { + // A revert of the creation + self.episodes.remove_entry(&episode_id); + self.episode_creation_times.remove_entry(&episode_id); + } + } else { + warn!("Episode {} not found.", episode_id); + } + return None; + } + } + None + } +} From 6ab00720201e62c39293fd6719456e1b4ba98abf Mon Sep 17 00:00:00 2001 From: ezrasisk <85811167+ezrasisk@users.noreply.github.com> Date: Mon, 10 Nov 2025 19:20:42 -0600 Subject: [PATCH 2/2] Fix typo in module documentation comment --- kdapp/src/engine_src.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kdapp/src/engine_src.rs b/kdapp/src/engine_src.rs index b62fb52c..9d8bda9c 100644 --- a/kdapp/src/engine_src.rs +++ b/kdapp/src/engine_src.rs @@ -1,4 +1,4 @@ -//! This module handles the logic of running and maintaining several epidodes of the same type +//! This module handles the logic of running and maintaining several episodes of the same type //! including keeping a stack of rollback objects per episode in order to support DAG reorg handling use borsh::{BorshDeserialize, BorshSerialize};