From 3ad1240324998277b14e0ca0600dcd6563c1f53d Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sun, 12 Oct 2025 17:27:15 -0400 Subject: [PATCH 01/11] intermediiate --- Cargo.lock | 1 + libs/pillar_core/Cargo.toml | 1 + libs/pillar_core/src/accounting/state.rs | 32 ++++---- libs/pillar_core/src/blockchain/chain.rs | 61 +++++++++++++--- .../pillar_core/src/blockchain/chain_shard.rs | 5 +- libs/pillar_core/src/nodes/node.rs | 2 +- libs/pillar_core/src/persistence/mod.rs | 60 ++++++++++++++- .../src/primitives/implementations/block.rs | 5 +- libs/pillar_core/src/protocol/chain.rs | 2 +- .../pillar_core/src/protocol/serialization.rs | 20 ++++- libs/pillar_crypto/src/merkle_trie.rs | 73 ++++++++++++++++++- libs/pillar_serialize/src/lib.rs | 31 +++++++- 12 files changed, 251 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0927acc..19d4bbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1112,6 +1112,7 @@ dependencies = [ "ctor", "flume", "futures-util", + "hex", "lfqueue", "lz4_flex", "pillar_crypto", diff --git a/libs/pillar_core/Cargo.toml b/libs/pillar_core/Cargo.toml index 210839a..9105705 100644 --- a/libs/pillar_core/Cargo.toml +++ b/libs/pillar_core/Cargo.toml @@ -23,3 +23,4 @@ lz4_flex = "0.11.5" bytemuck = {version="1.23.2", features=["derive"]} bytes = "1.10.1" byteorder = "1.5.0" +hex = "0.4.3" diff --git a/libs/pillar_core/src/accounting/state.rs b/libs/pillar_core/src/accounting/state.rs index febf806..524baea 100644 --- a/libs/pillar_core/src/accounting/state.rs +++ b/libs/pillar_core/src/accounting/state.rs @@ -12,9 +12,9 @@ pub type ReputationMap = HashMap; #[derive(Clone, Default)] pub struct StateManager{ // The mapping from address to account - pub state_trie: Arc>>, + pub state_trie: MerkleTrie, /// mapping of reputations for peers - pub reputations: Arc>, + pub reputations: ReputationMap, } impl Debug for StateManager { @@ -36,32 +36,29 @@ impl StateManager{ /// Create a new state manager with an empty trie and no reputations. pub fn new() -> Self { StateManager { - state_trie: Arc::new(Mutex::new(MerkleTrie::new())), - reputations: Arc::new(Mutex::new(HashMap::new())), + state_trie: MerkleTrie::new(), + reputations: HashMap::new(), } } /// Get an account by address at a specific `state_root` (if present). pub fn get_account(&self, address: &StdByteArray, state_root: StdByteArray) -> Option { - let state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.get(address, state_root) + self.state_trie.get(address, state_root) } /// Get an account or a default zero-balance account for the given address. pub fn get_account_or_default(&self, address: &StdByteArray, state_root: StdByteArray) -> Account { - let state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.get(address, state_root).unwrap_or(Account::new(*address, 0)) + self.state_trie.get(address, state_root).unwrap_or(Account::new(*address, 0)) } /// Return all accounts reachable from `root`. pub fn get_all_accounts(&self, root: StdByteArray) -> Vec{ - self.state_trie.lock().unwrap().get_all(root) + self.state_trie.get_all(root) } /// Remove a branched root and decrement reference counts, pruning unique nodes. pub fn remove_branch(&mut self, root: StdByteArray){ - let mut state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.trim_branch(root).expect("Failed to remove branch from state trie"); + self.state_trie.trim_branch(root).expect("Failed to remove branch from state trie"); } /// Apply a complete block to produce a new state root branch. @@ -99,13 +96,12 @@ impl StateManager{ // Update the accounts from the block let mut state_updates: HashMap = HashMap::new(); let state_root = prev_header.completion.as_ref().expect("Previous block should be complete").state_root; - let mut state_trie = self.state_trie.lock().expect("Failed to lock state trie"); for transaction in &block.transactions { let mut sender = match state_updates.get(&transaction.header.sender){ Some(account) => account.clone(), None => { // if the sender does not exist, we create a new account with 0 balance - let account = state_trie + let account = self.state_trie .get(&transaction.header.sender, state_root) .unwrap_or(Account::new(transaction.header.sender, 0)); @@ -124,7 +120,7 @@ impl StateManager{ let mut receiver = match state_updates.get(&transaction.header.receiver){ Some(account) => account.clone(), None => { - state_trie.get(&transaction.header.receiver, state_root).unwrap_or(Account::new(transaction.header.receiver, 0)) + self.state_trie.get(&transaction.header.receiver, state_root).unwrap_or(Account::new(transaction.header.receiver, 0)) }, }; // update balances @@ -138,7 +134,7 @@ impl StateManager{ Some(account) => account.clone(), None => { // if the miner account does not exist, we create a new account with 0 balance - state_trie.get(miner_address, state_root).unwrap_or(Account::new(*miner_address, 0)) + self.state_trie.get(miner_address, state_root).unwrap_or(Account::new(*miner_address, 0)) } }; miner_account.balance += if !por_enabled {reward} else {div_up(reward, POR_MINER_SHARE_DIVISOR)}; @@ -161,7 +157,7 @@ impl StateManager{ let mut stamper_account = match state_updates.get(stamper) { Some(account) => account.clone(), None => { - state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) + self.state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) } }; stamper_account.balance += stamper_reward; @@ -183,7 +179,7 @@ impl StateManager{ let mut stamper = match state_updates.get(stamper){ Some(account) => account.clone(), None => { - state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) + self.state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) } }; if stamper.history.is_none(){ @@ -194,6 +190,6 @@ impl StateManager{ state_updates.insert(stamper.address, stamper); } // branch the state trie with the updates - state_trie.branch(Some(state_root), state_updates).expect("Issue with branching state trie") + self.state_trie.branch(Some(state_root), state_updates).expect("Issue with branching state trie") } } \ No newline at end of file diff --git a/libs/pillar_core/src/blockchain/chain.rs b/libs/pillar_core/src/blockchain/chain.rs index ebba4f9..820c873 100644 --- a/libs/pillar_core/src/blockchain/chain.rs +++ b/libs/pillar_core/src/blockchain/chain.rs @@ -1,13 +1,11 @@ -use core::hash; -use std::{collections::{HashMap, HashSet}, result}; +use std::{collections::{HashMap, HashSet}}; -use lz4_flex::block; use pillar_crypto::{hashing::DefaultHash, signing::{DefaultVerifier, SigVerFunction}, types::StdByteArray}; use tracing::instrument; use crate::{ - accounting::{account::Account, state::StateManager}, primitives::{block::{Block, BlockHeader}, errors::BlockValidationError, transaction::Transaction}, protocol::{chain::get_genesis_block, pow::get_difficulty_for_block, reputation::{get_current_reputations_for_stampers, get_current_reputations_for_stampers_from_state}} + accounting::{account::Account, state::StateManager}, persistence::{ Persistable}, primitives::{block::{Block, BlockHeader}, errors::BlockValidationError, transaction::Transaction}, protocol::{chain::get_genesis_block, pow::get_difficulty_for_block, reputation::{get_current_reputations_for_stampers, get_current_reputations_for_stampers_from_state}} }; use super::TrimmableChain; @@ -33,12 +31,9 @@ pub struct Chain { impl Chain { /// Creates a new blockchain with a genesis block. pub fn new_with_genesis() -> Self { - let state_manager = StateManager::new(); + let mut state_manager = StateManager::new(); let state_root = state_manager .state_trie - .lock() - .as_mut() - .unwrap() .create_genesis([0; 32], Account::default()) .expect("Failed to create genesis state root"); @@ -66,11 +61,12 @@ impl Chain { } /// Create a chain without a genesis block. + /// Does NOT set state manager #[instrument( skip_all, name = "Chain::new_from_blocks", )] - pub fn new_from_blocks(blocks: HashMap) -> Self{ + pub(crate) unsafe fn new_from_blocks(blocks: HashMap) -> Self{ let mut headers = HashMap::new(); let mut deepest_hash = [0; 32]; let mut depth = 0; @@ -116,7 +112,7 @@ impl Chain { // get all reputations according to previous block let reputations = get_current_reputations_for_stampers(self, &block.header).values().cloned().collect::>(); - let (expected_target, is_por) = get_difficulty_for_block(&block.header, &reputations); + let (expected_target, _is_por) = get_difficulty_for_block(&block.header, &reputations); if block.header.completion.is_none() || expected_target != block.header.completion.as_ref().unwrap().difficulty_target { tracing::info!("Block difficulty target is invalid - Failing"); @@ -936,4 +932,49 @@ mod tests { assert!(chain.blocks.contains_key(&main_hash)); assert_eq!(chain.leaves.len(), 1); // Only the main chain remains } + + #[tokio::test] + async fn test_new_from_blocks(){ + let mut chain = Chain::new_with_genesis(); + let mut signing_key = DefaultSigner::generate_random(); + // public + let sender = signing_key.get_verifying_function().to_bytes(); + + let mut parent_hash = chain.deepest_hash; + let genesis_hash = parent_hash; + + // Build a main chain of depth 5 + for depth in 1..=5 { + let time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() + depth; + let mut transaction = Transaction::new(sender, [2; 32], 0, time, depth-1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut block = Block::new( + parent_hash, + 0, + time, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain.headers.get(&block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = chain.state_manager.branch_from_block_internal(&block, prev_header, &sender); + mine(&mut block, sender, state_root, vec![], None, DefaultHash::new()).await; + parent_hash = block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(block).unwrap(); + } + + let blocks: HashMap = chain.blocks.clone(); + let new_chain = unsafe { Chain::new_from_blocks(blocks) }; + assert_eq!(new_chain.depth, chain.depth); + assert_eq!(new_chain.leaves, chain.leaves); + assert_eq!(new_chain.blocks, chain.blocks); + assert_eq!(new_chain.headers, chain.headers); + assert_eq!(new_chain.deepest_hash, chain.deepest_hash); + + } } diff --git a/libs/pillar_core/src/blockchain/chain_shard.rs b/libs/pillar_core/src/blockchain/chain_shard.rs index 7e27067..37cf7d7 100644 --- a/libs/pillar_core/src/blockchain/chain_shard.rs +++ b/libs/pillar_core/src/blockchain/chain_shard.rs @@ -20,11 +20,8 @@ impl ChainShard{ /// ensures the hashs are good, and the depths work pub fn validate(&self) -> Result<(), BlockValidationError>{ let mut genesis_found = false; - let state_manager = StateManager::new(); + let mut state_manager = StateManager::new(); let state_root = state_manager.state_trie - .lock() - .as_mut() - .unwrap() .create_genesis([0; 32], Account::default()).unwrap(); for (declared_hash, header) in &self.headers { diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index cb183ac..3bde468 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -579,7 +579,7 @@ impl From for Peer { } } -pub trait Broadcaster { +pub(crate) trait Broadcaster { /// Broadcast a message to all peers. async fn broadcast(&self, message: &Message) -> Result, std::io::Error>; } diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 3ea3f13..47da55e 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -1 +1,59 @@ -pub mod database; \ No newline at end of file +use std::{collections::HashMap, path::PathBuf}; + +use pillar_crypto::types::StdByteArray; +use pillar_serialize::PillarSerialize; + +use crate::{blockchain::chain::Chain, primitives::block::Block}; + +pub mod database; + +pub(crate) trait Persistable + where Self: Sized + PillarSerialize +{ + async fn save(&self, path: &PathBuf) -> Result<(), std::io::Error>{ + let bytes = self.serialize_pillar()?; + tokio::fs::write(path, bytes).await + } + async fn load(path: &PathBuf) -> Result { + let bytes = tokio::fs::read(path).await?; + Self::deserialize_pillar(&bytes) + } +} + +impl Persistable for Block{} // we are using the default implementations, which is just serialize and save to a path + +// chains serialization is to save one block per file +impl Persistable for Chain { + async fn save(&self, path: &std::path::PathBuf) -> Result<(), std::io::Error> { + if !std::fs::metadata(path)?.is_dir(){ + return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path must be a directory")); + } + for (block_hash, block) in &self.blocks { + let block_path = path.join(format!("{}.bin", hex::encode(block_hash))); + block.save(&block_path).await?; + } + Ok(()) + } + + async fn load(path: &std::path::PathBuf) -> Result { + if !std::fs::metadata(path)?.is_dir(){ + return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path must be a directory")); + } + let block_files = std::fs::read_dir(path)?; + let mut blocks: HashMap = HashMap::new(); + for entry in block_files { + let entry = entry?; + if entry.file_type()?.is_file() { + let hash: StdByteArray = hex::decode(entry.file_name().to_str().unwrap()) + .unwrap() + .try_into() + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename"))?; + let block = Block::deserialize_pillar(&tokio::fs::read(entry.path()).await?)?; + blocks.insert(hash, block); + } + } + let chain = unsafe{Chain::new_from_blocks(blocks)}; + todo!("remove unsafe call"); + Ok(chain) + } +} \ No newline at end of file diff --git a/libs/pillar_core/src/primitives/implementations/block.rs b/libs/pillar_core/src/primitives/implementations/block.rs index 00faf26..0ccef50 100644 --- a/libs/pillar_core/src/primitives/implementations/block.rs +++ b/libs/pillar_core/src/primitives/implementations/block.rs @@ -1,7 +1,7 @@ use pillar_crypto::{hashing::{DefaultHash, HashFunction}, merkle::{generate_tree, MerkleTree}, proofs::{generate_proof_of_inclusion, verify_proof_of_inclusion, MerkleProof}, types::StdByteArray}; -use crate::{primitives::{block::{Block, BlockHeader, BlockTail, Stamp}, transaction::Transaction}, protocol::reputation::N_TRANSMISSION_SIGNATURES}; +use crate::{persistence::Persistable, primitives::{block::{Block, BlockHeader, BlockTail, Stamp}, transaction::Transaction}, protocol::reputation::N_TRANSMISSION_SIGNATURES}; impl Block { /// Create a new block @@ -74,4 +74,5 @@ impl Block { false } } -} \ No newline at end of file +} + diff --git a/libs/pillar_core/src/protocol/chain.rs b/libs/pillar_core/src/protocol/chain.rs index c2a0fb6..a3a6f09 100644 --- a/libs/pillar_core/src/protocol/chain.rs +++ b/libs/pillar_core/src/protocol/chain.rs @@ -268,7 +268,7 @@ pub async fn service_sync(node: Node, leaves: &Vec) -> Result Result, std::io::Error> { + let mut bytes = vec![]; + let reputation_bytes = self.reputations.serialize_pillar()?; + bytes.extend((reputation_bytes.len() as u32).to_le_bytes()); + bytes.extend(reputation_bytes); + + Ok(bytes) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + todo!() + } +} + impl PillarSerialize for Block{ /// Serialize a `Block` as `[header][transactions...]`. /// @@ -214,6 +229,7 @@ impl PillarSerialize for Chain { buffer.extend(self.deepest_hash.serialize_pillar()?); // TODO maybe big clone buffer.extend(self.leaves.iter().cloned().collect::>().serialize_pillar()?); + // Ok(buffer) } @@ -245,8 +261,8 @@ impl PillarSerialize for Chain { let leaves = Vec::::deserialize_pillar(&data[offset..])?; Ok(Chain { - blocks, - headers, + blocks, + headers, depth, deepest_hash, leaves: leaves.into_iter().collect(), diff --git a/libs/pillar_crypto/src/merkle_trie.rs b/libs/pillar_crypto/src/merkle_trie.rs index 22fe8a1..b686688 100644 --- a/libs/pillar_crypto/src/merkle_trie.rs +++ b/libs/pillar_crypto/src/merkle_trie.rs @@ -7,10 +7,11 @@ use std::{collections::{HashMap, HashSet, VecDeque}, fmt::Debug, marker::PhantomData}; -use pillar_serialize::PillarSerialize; -use slotmap::{new_key_type, SlotMap}; +use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; +use slotmap::{new_key_type, KeyData, SlotMap}; use crate::{hashing::{DefaultHash, HashFunction, Hashable}, types::StdByteArray}; + new_key_type! { pub struct NodeKey; } /// In order to store account states, a Merkle Patricia Trie will be used @@ -55,6 +56,7 @@ impl TrieNode{ } +#[derive(Clone)] pub struct MerkleTrie { _phantum: PhantomData, pub(crate) nodes: SlotMap>, // SlotMap to store Trie nodes @@ -336,7 +338,74 @@ impl MerkleTrie { } +impl PillarSerialize for NodeKey { + fn serialize_pillar(&self) -> Result, std::io::Error> { + Ok(self.0.as_ffi().serialize_pillar()?) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + Ok(NodeKey(KeyData::from_ffi( + u64::from_le_bytes(data[0..8].try_into().unwrap()) + ))) + } +} + +impl PillarSerialize for MerkleTrie { + fn serialize_pillar(&self) -> Result, std::io::Error> { + let mut buffer = Vec::new(); + buffer.extend((self.nodes.len() as u32).to_le_bytes()); + for (key, node) in &self.nodes { + let mut internal_buffer = vec![]; + // key + internal_buffer.extend(key.serialize_pillar()?); // always 8 + // references + internal_buffer.extend(node.references.to_le_bytes()); // always 2 + // value + let vbuff = node.value.serialize_pillar()?; + internal_buffer.extend((vbuff.len() as u32).to_le_bytes()); + internal_buffer.extend(vbuff); + // children + let d = node.children.serialize_pillar()?; + internal_buffer.extend((d.len() as u32).to_le_bytes()); + internal_buffer.extend(d); + + // give length and add + buffer.extend((internal_buffer.len() as u32).to_le_bytes()); + buffer.extend(internal_buffer); + } + buffer.extend(self.roots.serialize_pillar()?); + Ok(buffer) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + let n = u32::from_le_bytes(data[0..4].try_into().unwrap()); + let mut offset = 4; + let nodes = SlotMap::new(); + for _ in 0..n { + let key = NodeKey::deserialize_pillar(&data[offset..offset + 8])?; + offset += 8; + let references = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()); + offset += 2; + let value_size = u32::from_le_bytes(data[offset..offset+4].try_into().unwrap()) as usize; + offset += 4; + let values: Option> = Option::>::deserialize_pillar(&data[offset.. offset + value_size])?; + offset += value_size; + let children_size = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + let children: [Option; 16] = <[Option; 16]>::deserialize_pillar(&data[offset.. offset + children_size])?; + offset += children_size; + let node = TrieNode{ + references: references, + children: children, + value: values, + _phantum: PhantomData + }; + nodes.insert(value) + } + todo!(); + } +} #[cfg(test)] mod tests { diff --git a/libs/pillar_serialize/src/lib.rs b/libs/pillar_serialize/src/lib.rs index def4360..b5f7fec 100644 --- a/libs/pillar_serialize/src/lib.rs +++ b/libs/pillar_serialize/src/lib.rs @@ -89,7 +89,7 @@ impl PillarFixedSize for i8 {} impl PillarFixedSize for i16 {} impl PillarFixedSize for i32 {} impl PillarFixedSize for i64 {} -impl PillarFixedSize for [u8; 32] {} +impl PillarFixedSize for [u8; C] {} impl PillarNativeEndian for StdByteArray { fn to_le(&mut self) {} @@ -335,4 +335,33 @@ impl PillarSerialize for String { .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid UTF-8"))?; Ok(string) } +} + +impl PillarSerialize for [Option; C]{ + fn serialize_pillar(&self) -> Result, std::io::Error> { + let mut buffer = Vec::new(); + for item in self { + let internal = item.serialize_pillar()?; + buffer.extend((internal.len() as u32).to_le_bytes()); + buffer.extend(internal); + } + Ok(buffer) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + let mut offset = 0; + let mut array: [Option; C] = [None; C]; + for i in 0..C { + let length = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + let item = if length > 0 { + Some(T::deserialize_pillar(&data[offset..offset + length])?) + } else { + None + }; + array[i] = item; + offset += length; + } + Ok(array) + } } \ No newline at end of file From 9c370a1386fec50a5092a2abaaebd0ebdff2987d Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sun, 12 Oct 2025 17:54:02 -0400 Subject: [PATCH 02/11] perhaps works, needs merkle trie serialize deserialize tesr --- libs/pillar_crypto/src/merkle_trie.rs | 86 ++++++++++++++++----------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/libs/pillar_crypto/src/merkle_trie.rs b/libs/pillar_crypto/src/merkle_trie.rs index b686688..19215da 100644 --- a/libs/pillar_crypto/src/merkle_trie.rs +++ b/libs/pillar_crypto/src/merkle_trie.rs @@ -4,7 +4,7 @@ //! into nibbles (0-15). Values are serialized using `PillarSerialize`. The //! structure supports multiple roots for branching state (e.g., competing //! chain tips) while sharing unchanged subtrees. -use std::{collections::{HashMap, HashSet, VecDeque}, fmt::Debug, marker::PhantomData}; +use std::{collections::{HashMap, HashSet, VecDeque}, fmt::Debug, hash::Hash, marker::PhantomData}; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; @@ -355,55 +355,69 @@ impl PillarSerialize for Merk let mut buffer = Vec::new(); buffer.extend((self.nodes.len() as u32).to_le_bytes()); for (key, node) in &self.nodes { - let mut internal_buffer = vec![]; - // key - internal_buffer.extend(key.serialize_pillar()?); // always 8 - // references - internal_buffer.extend(node.references.to_le_bytes()); // always 2 - // value - let vbuff = node.value.serialize_pillar()?; - internal_buffer.extend((vbuff.len() as u32).to_le_bytes()); - internal_buffer.extend(vbuff); - // children - let d = node.children.serialize_pillar()?; - internal_buffer.extend((d.len() as u32).to_le_bytes()); - internal_buffer.extend(d); - - // give length and add - buffer.extend((internal_buffer.len() as u32).to_le_bytes()); - buffer.extend(internal_buffer); + buffer.extend(key.serialize_pillar()?); + buffer.extend(node.references.serialize_pillar()?); + let vbuff = node.value.serialize_pillar()?; + buffer.extend((vbuff.len() as u32).to_le_bytes()); + buffer.extend(vbuff); + } + for (_, node) in &self.nodes { + // do children now + let cbuff = node.children.serialize_pillar()?; + buffer.extend((cbuff.len() as u32).to_le_bytes()); + buffer.extend(cbuff); } buffer.extend(self.roots.serialize_pillar()?); Ok(buffer) } fn deserialize_pillar(data: &[u8]) -> Result { + let mut key_map: HashMap = HashMap::new(); let n = u32::from_le_bytes(data[0..4].try_into().unwrap()); let mut offset = 4; - let nodes = SlotMap::new(); + let mut nodes = SlotMap::with_key(); + let mut order = vec![]; for _ in 0..n { let key = NodeKey::deserialize_pillar(&data[offset..offset + 8])?; - offset += 8; - let references = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()); + offset+=8; + let references = u16::from_le_bytes(data[offset..offset+2].try_into().unwrap()); offset += 2; - let value_size = u32::from_le_bytes(data[offset..offset+4].try_into().unwrap()) as usize; - offset += 4; - let values: Option> = Option::>::deserialize_pillar(&data[offset.. offset + value_size])?; - offset += value_size; - let children_size = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + let vlength = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; offset += 4; - let children: [Option; 16] = <[Option; 16]>::deserialize_pillar(&data[offset.. offset + children_size])?; - offset += children_size; - let node = TrieNode{ - references: references, - children: children, - value: values, - _phantum: PhantomData - }; - nodes.insert(value) + let value = Option::>::deserialize_pillar(&data[offset..offset + vlength])?; + offset += vlength; + let mut node: TrieNode = TrieNode::new(); + node.value = value; + node.references = references; + let new_key = nodes.insert(node); + key_map.insert(key, new_key); + order.push(new_key); + } + + for i in 0..n { + let length = u32::from_le_bytes(data[offset..offset+4].try_into().unwrap()) as usize; + let children: [Option; 16] = <[Option; 16]>::deserialize_pillar(&data[offset..offset + length])? + .map(|old_key: Option|{ + match old_key { + None => None, + Some(key) => key_map.get(&key).cloned() + } + }); + offset += length; + let node = nodes.get_mut(*order.get(i as usize).unwrap()).unwrap(); + node.children = children; + } + + let mut roots: HashMap = HashMap::::deserialize_pillar(&data[offset..])?; + for (k, v) in roots.clone() { + roots.insert(k.clone(), key_map.get(&v).cloned().unwrap()); } - todo!(); + Ok(MerkleTrie { + roots: roots, + _phantum: PhantomData, + nodes: nodes + }) } } From eb90277c108303cefd061b7d28f2139b2fecaac6 Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sun, 12 Oct 2025 23:00:40 -0400 Subject: [PATCH 03/11] trie serialize complete --- libs/pillar_crypto/src/merkle_trie.rs | 128 +++++++++++++++++++++++--- libs/pillar_serialize/src/lib.rs | 6 +- 2 files changed, 116 insertions(+), 18 deletions(-) diff --git a/libs/pillar_crypto/src/merkle_trie.rs b/libs/pillar_crypto/src/merkle_trie.rs index 19215da..49bf1b9 100644 --- a/libs/pillar_crypto/src/merkle_trie.rs +++ b/libs/pillar_crypto/src/merkle_trie.rs @@ -355,8 +355,8 @@ impl PillarSerialize for Merk let mut buffer = Vec::new(); buffer.extend((self.nodes.len() as u32).to_le_bytes()); for (key, node) in &self.nodes { - buffer.extend(key.serialize_pillar()?); - buffer.extend(node.references.serialize_pillar()?); + buffer.extend(key.serialize_pillar()?); // 8 bytes + buffer.extend(node.references.serialize_pillar()?); // 2 bytes let vbuff = node.value.serialize_pillar()?; buffer.extend((vbuff.len() as u32).to_le_bytes()); buffer.extend(vbuff); @@ -396,6 +396,7 @@ impl PillarSerialize for Merk for i in 0..n { let length = u32::from_le_bytes(data[offset..offset+4].try_into().unwrap()) as usize; + offset += 4; let children: [Option; 16] = <[Option; 16]>::deserialize_pillar(&data[offset..offset + length])? .map(|old_key: Option|{ match old_key { @@ -619,18 +620,18 @@ mod tests { fn test_proof_for_branch() { let initial_account_info = AccountState { balance: 100, nonce: 1 }; // let (mut trie, initial_root) = MerkleTrie::<&str, AccountState>::new("account0", initial_account_info.clone()); - let mut trie = MerkleTrie::<&str, AccountState>::new(); - let initial_root = trie.create_genesis("account0", initial_account_info.clone()).expect("Failed to create genesis"); + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); let account1 = AccountState { balance: 200, nonce: 2 }; - trie.insert("account1", account1.clone(), initial_root).unwrap(); + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); // let branch_keys = vec![("account1", AccountState { balance: 300, nonce: 3 })]; let mut branch_keys = HashMap::new(); - branch_keys.insert("account1", AccountState { balance: 300, nonce: 3 }); + branch_keys.insert("account1".into(), AccountState { balance: 300, nonce: 3 }); let new_root = trie.branch(Some(initial_root), branch_keys).unwrap(); - let (proof, _) = generate_proof_of_state(&trie, "account1", Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); + let (proof, _) = generate_proof_of_state(&trie, "account1".into(), Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); let root_key = trie.roots.get(&new_root).expect("Root not found"); @@ -647,6 +648,29 @@ mod tests { &mut DefaultHash::new(), ); assert!(valid, "Proof verification failed for branch"); + + // test serialize + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + let (proof, _) = generate_proof_of_state(&trie2, "account1".into(), Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); + + let root_key = trie2.roots.get(&new_root).expect("Root not found"); + + let valid = proof.verify( + AccountState { balance: 300, nonce: 3 }.serialize_pillar().unwrap(), + trie2.get_hash_for(*root_key, &mut DefaultHash::new()).unwrap(), + &mut DefaultHash::new(), + ); + assert!(valid, "Proof verification failed for branch"); + + let valid = proof.verify( + AccountState { balance: 300, nonce: 3 }.serialize_pillar().unwrap(), + trie2.get_hash_for(*root_key, &mut DefaultHash::new()).unwrap(), + &mut DefaultHash::new(), + ); + assert!(valid, "Proof verification failed for branch"); + } #[test] @@ -744,23 +768,23 @@ mod tests { #[test] fn test_get_all() { let initial_account_info = AccountState { balance: 100, nonce: 1 }; - let mut trie = MerkleTrie::<&str, AccountState>::new(); - let initial_root = trie.create_genesis("account0", initial_account_info.clone()).expect("Failed to create genesis"); + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); let account1 = AccountState { balance: 200, nonce: 2 }; - trie.insert("account1", account1.clone(), initial_root).unwrap(); + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); let account2 = AccountState { balance: 300, nonce: 3 }; - trie.insert("account2", account2.clone(), initial_root).unwrap(); + trie.insert("account2".into(), account2.clone(), initial_root).unwrap(); let account3 = AccountState { balance: 400, nonce: 4 }; - trie.insert("account3", account3.clone(), initial_root).unwrap(); + trie.insert("account3".into(), account3.clone(), initial_root).unwrap(); // branch let account4 = AccountState { balance: 4000, nonce: 0}; let mut updates = HashMap::new(); - updates.insert("account4", account4.clone()); + updates.insert("account4".into(), account4.clone()); let _ = trie.branch(Some(initial_root), updates).unwrap(); let all_values = trie.get_all(initial_root); @@ -771,6 +795,20 @@ mod tests { assert!(all_values.contains(&account2)); assert!(all_values.contains(&account3)); assert!(!all_values.contains(&account4)); + + // quick serialize + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + let all_values = trie2.get_all(initial_root); + + assert_eq!(all_values.len(), 4); // account0, account1, account2, account3 + assert!(all_values.contains(&initial_account_info)); + assert!(all_values.contains(&account1)); + assert!(all_values.contains(&account2)); + assert!(all_values.contains(&account3)); + assert!(!all_values.contains(&account4)); + } #[test] @@ -874,4 +912,68 @@ mod tests { } + + #[test] + fn test_serialize_trie() { + let initial_account_info = AccountState { balance: 100, nonce: 1 }; + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); + + let account1 = AccountState { balance: 200, nonce: 2 }; + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); + + let account2 = AccountState { balance: 300, nonce: 3 }; + trie.insert("account2".into(), account2.clone(), initial_root).unwrap(); + + // branch + let mut updates = HashMap::new(); + updates.insert("account3".into(), AccountState { balance: 2000, nonce: 44 }); + let new_root = trie.branch(Some(initial_root), updates).unwrap(); + + // branch again + let mut updates2 = HashMap::new(); + updates2.insert("account4".into(), AccountState { balance: 0, nonce: 1 }); + let new_root2 = trie.branch(Some(new_root), updates2).unwrap(); + + // check that the new root is still there + assert!(trie.get(&"account3".into(), new_root).is_some()); + assert!(trie.get(&"account4".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie.get(&"account1".into(), initial_root), Some(account1.clone())); + assert_eq!(trie.get(&"account2".into(), initial_root), Some(account2.clone())); + assert_eq!(trie.get(&"account0".into(), initial_root), Some(initial_account_info.clone())); + + // getall, make sure account 3 does not exist + let all_values = trie.get_all(initial_root); + assert_eq!(all_values.len(), 3); // account0, account1, account2 + + // trim the branch + trie.trim_branch(new_root).expect("Failed to trim branch"); + + // check that the new root is still there + assert!(trie.get(&"account3".into(), new_root).is_none()); + assert!(trie.get(&"account3".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie.get(&"account1".into(), initial_root), Some(account1.clone())); + assert_eq!(trie.get(&"account2".into(), initial_root), Some(account2.clone())); + assert_eq!(trie.get(&"account0".into(), initial_root), Some(initial_account_info.clone())); + + // ========================= test serialize ========================= + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + // same assettions + + // check that the new root is still there + assert!(trie2.get(&"account3".into(), new_root).is_none()); + assert!(trie2.get(&"account3".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie2.get(&"account1".into(), initial_root), Some(account1)); + assert_eq!(trie2.get(&"account2".into(), initial_root), Some(account2)); + assert_eq!(trie2.get(&"account0".into(), initial_root), Some(initial_account_info)); + + } } \ No newline at end of file diff --git a/libs/pillar_serialize/src/lib.rs b/libs/pillar_serialize/src/lib.rs index b5f7fec..91a0160 100644 --- a/libs/pillar_serialize/src/lib.rs +++ b/libs/pillar_serialize/src/lib.rs @@ -354,11 +354,7 @@ impl PillarSerialize for [Option; for i in 0..C { let length = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; offset += 4; - let item = if length > 0 { - Some(T::deserialize_pillar(&data[offset..offset + length])?) - } else { - None - }; + let item = Option::::deserialize_pillar(&data[offset..offset + length])?; array[i] = item; offset += length; } From 025cef7e64820d104eb7cd9cc0584ec2fe75b01d Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sun, 12 Oct 2025 23:15:10 -0400 Subject: [PATCH 04/11] state serialization --- libs/pillar_core/src/accounting/state.rs | 2 +- libs/pillar_core/src/persistence/mod.rs | 2 +- libs/pillar_core/src/protocol/serialization.rs | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/libs/pillar_core/src/accounting/state.rs b/libs/pillar_core/src/accounting/state.rs index 524baea..bf8e060 100644 --- a/libs/pillar_core/src/accounting/state.rs +++ b/libs/pillar_core/src/accounting/state.rs @@ -1,5 +1,5 @@ //! State management built on a Merkle trie with optional reputation tracking. -use std::{collections::HashMap, fmt::Debug, sync::{Arc, Mutex}}; +use std::{collections::HashMap, fmt::Debug}; use pillar_crypto::{merkle_trie::MerkleTrie, types::StdByteArray}; diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 47da55e..13594f2 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -31,7 +31,7 @@ impl Persistable for Chain { for (block_hash, block) in &self.blocks { let block_path = path.join(format!("{}.bin", hex::encode(block_hash))); block.save(&block_path).await?; - } + } Ok(()) } diff --git a/libs/pillar_core/src/protocol/serialization.rs b/libs/pillar_core/src/protocol/serialization.rs index b07907a..b131843 100644 --- a/libs/pillar_core/src/protocol/serialization.rs +++ b/libs/pillar_core/src/protocol/serialization.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use pillar_crypto::types::{StdByteArray, STANDARD_ARRAY_LENGTH}; +use pillar_crypto::{merkle_trie::MerkleTrie, types::{StdByteArray, STANDARD_ARRAY_LENGTH}}; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; use tokio::{io::AsyncReadExt, net::TcpStream}; @@ -176,7 +176,10 @@ impl PillarSerialize for StateManager { } fn deserialize_pillar(data: &[u8]) -> Result { - todo!() + let trielen = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize; + let trie = MerkleTrie::::deserialize_pillar(&data[4..trielen + 4])?; + let reputations = HashMap::::deserialize_pillar(&data[trielen + 4..])?; + Ok(StateManager { state_trie: trie, reputations }) } } From 3a882bcc79f6df573d08f3da9a66ed096f487cca Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Mon, 13 Oct 2025 13:01:30 -0400 Subject: [PATCH 05/11] persistable trait implmented for Chain --- libs/pillar_core/src/blockchain/chain.rs | 52 +++++++----- libs/pillar_core/src/persistence/mod.rs | 101 ++++++++++++++++++----- 2 files changed, 113 insertions(+), 40 deletions(-) diff --git a/libs/pillar_core/src/blockchain/chain.rs b/libs/pillar_core/src/blockchain/chain.rs index 820c873..22e9368 100644 --- a/libs/pillar_core/src/blockchain/chain.rs +++ b/libs/pillar_core/src/blockchain/chain.rs @@ -286,6 +286,37 @@ impl Chain { Ok(()) } + /// retruns False if the existing deepest is better + /// returns true if it shold be replaced + #[inline] + pub(crate) fn block_greater(&self, new_block: &Block) -> bool { + if self.depth > new_block.header.depth { + false + } else if self.depth == new_block.header.depth { + // take the block which is stamped with the most reputation + let current_deepest = self.get_top_block().unwrap(); + let existing_reputation: f64 = get_current_reputations_for_stampers_from_state( + &self.state_manager, + self.headers.get(¤t_deepest.header.previous_hash).unwrap(), + ¤t_deepest.header + ).values().sum(); + let new_reputation: f64 = get_current_reputations_for_stampers_from_state( + &self.state_manager, + self.headers.get(&new_block.header.previous_hash).unwrap(), + &new_block.header + ).values().sum(); + if new_reputation > existing_reputation || ( + new_reputation == existing_reputation && new_block.header.tail.n_stamps() > current_deepest.header.tail.n_stamps() + ){ + true + } else { + false + } + } else { // self.depth < new_block.header.depth + true + } + } + /// Call this only after a block has been verified #[instrument(skip_all, fields(block = ?block.header.completion))] fn settle_new_block(&mut self, block: Block) -> Result<(), BlockValidationError>{ @@ -309,29 +340,10 @@ impl Chain { tracing::debug!("Block settled in chain, but need to update depth."); // update the depth - the depth of this block is checked in the verification // perhaps this is a fork deeper in the chain, so we do not always update - if block.header.depth > self.depth { + if self.block_greater(&block) { tracing::info!("Chain depth expanded to {}", block.header.depth); self.deepest_hash = block.header.completion.as_ref().unwrap().hash; self.depth = block.header.depth; - } else if block.header.depth == self.depth { - // take the block which is stamped with the most reputation - let current_deepest = self.get_top_block().unwrap(); - let existing_reputation: f64 = get_current_reputations_for_stampers_from_state( - &self.state_manager, - self.headers.get(¤t_deepest.header.previous_hash).unwrap(), - ¤t_deepest.header - ).values().sum(); - let new_reputation: f64 = get_current_reputations_for_stampers_from_state( - &self.state_manager, - self.headers.get(&block.header.previous_hash).unwrap(), - &block.header - ).values().sum(); - if new_reputation > existing_reputation || ( - new_reputation == existing_reputation && block.header.tail.n_stamps() > current_deepest.header.tail.n_stamps() - ){ - tracing::info!("Chain depth fork replaced deepest block with higher reputation fork"); - self.deepest_hash = block.header.completion.as_ref().unwrap().hash; - } } Ok(()) } diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 13594f2..302a103 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -1,9 +1,10 @@ -use std::{collections::HashMap, path::PathBuf}; +use std::{collections::{HashMap, HashSet}, path::PathBuf}; -use pillar_crypto::types::StdByteArray; -use pillar_serialize::PillarSerialize; +use bytemuck::{Pod, Zeroable}; +use pillar_crypto::{hashing::Hashable, types::StdByteArray}; +use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; -use crate::{blockchain::chain::Chain, primitives::block::Block}; +use crate::{accounting::state::StateManager, blockchain::chain::Chain, primitives::block::{Block, BlockHeader}}; pub mod database; @@ -20,7 +21,53 @@ pub(crate) trait Persistable } } +impl Persistable for Vec where V: PillarSerialize {} + impl Persistable for Block{} // we are using the default implementations, which is just serialize and save to a path +impl Persistable for StateManager {} + +#[inline] +fn load_blocks_from_dir(path: &PathBuf) -> Result, std::io::Error>{ + let block_files = std::fs::read_dir(path)?; + let mut blocks: HashMap = HashMap::new(); + for entry in block_files { + let entry = entry?; + if entry.file_type()?.is_file() { + let hash: StdByteArray = hex::decode(entry.file_name().to_str().unwrap()) + .unwrap() + .try_into() + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename"))?; + let block = Block::deserialize_pillar(&std::fs::read(entry.path())?)?; + blocks.insert(hash, block); + } + } + Ok(blocks) +} + +#[inline] +fn get_headers_from_blocks(blocks: &HashMap) -> HashMap { + let mut headers = HashMap::new(); + for (hash, block) in blocks { + headers.insert(*hash, block.header.clone()); + } + headers +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct ChainMeta { + deepest_hash: StdByteArray, + depth: u64, + _padding: [u8; 24], // padding to make the struct size a multiple of 32 bytes +} + +impl PillarNativeEndian for ChainMeta { + fn to_le(&mut self) { + self.depth = self.depth.to_le(); + } +} +impl PillarFixedSize for ChainMeta {} +impl Persistable for ChainMeta {} // chains serialization is to save one block per file impl Persistable for Chain { @@ -31,7 +78,19 @@ impl Persistable for Chain { for (block_hash, block) in &self.blocks { let block_path = path.join(format!("{}.bin", hex::encode(block_hash))); block.save(&block_path).await?; - } + }; + let metadata = ChainMeta { + deepest_hash: self.deepest_hash, + depth: self.depth, + _padding: [0u8; 24], + }; + let meta_path = path.join("meta.bin"); + let state_manager_path = path.join("state.bin"); + let leaves_path = path.join("leaves.bin"); + metadata.save(&meta_path).await?; + // TODO this clone sus af (implment serialization for reference types) + self.leaves.iter().cloned().collect::>().save(&leaves_path).await?; + self.state_manager.save(&state_manager_path).await?; Ok(()) } @@ -39,21 +98,23 @@ impl Persistable for Chain { if !std::fs::metadata(path)?.is_dir(){ return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path must be a directory")); } - let block_files = std::fs::read_dir(path)?; - let mut blocks: HashMap = HashMap::new(); - for entry in block_files { - let entry = entry?; - if entry.file_type()?.is_file() { - let hash: StdByteArray = hex::decode(entry.file_name().to_str().unwrap()) - .unwrap() - .try_into() - .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename"))?; - let block = Block::deserialize_pillar(&tokio::fs::read(entry.path()).await?)?; - blocks.insert(hash, block); - } - } - let chain = unsafe{Chain::new_from_blocks(blocks)}; - todo!("remove unsafe call"); + let blocks = load_blocks_from_dir(path)?; + let headers = get_headers_from_blocks(&blocks); + let metadata = ChainMeta::load(&path.join("meta.bin")).await?; + let state_manager = StateManager::load(&path.join("state.bin")).await?; + let leaves = Vec::::load(&path.join("leaves.bin")).await? + .iter() + .cloned() + .collect::>(); + + let chain = Chain { + deepest_hash: metadata.deepest_hash, + depth: metadata.depth, + blocks, + headers, + leaves, + state_manager, + }; Ok(chain) } } \ No newline at end of file From a986381d16ee632a84f21c002c8254e0648e5507 Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Mon, 13 Oct 2025 16:17:32 -0400 Subject: [PATCH 06/11] persistence working --- Cargo.lock | 49 ++++ libs/pillar_core/Cargo.toml | 1 + libs/pillar_core/src/persistence/mod.rs | 212 ++++++++++++++++-- .../pillar_core/src/protocol/serialization.rs | 8 +- libs/pillar_serialize/src/lib.rs | 4 +- 5 files changed, 250 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19d4bbb..cc4361f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,6 +523,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fiat-crypto" version = "0.2.9" @@ -886,6 +902,12 @@ version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "lock_api" version = "0.4.13" @@ -1120,6 +1142,7 @@ dependencies = [ "rand", "sled", "slotmap", + "tempfile", "tokio", "tracing", "tracing-appender", @@ -1300,6 +1323,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +dependencies = [ + "bitflags 2.9.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + [[package]] name = "rustversion" version = "1.0.21" @@ -1556,6 +1592,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "tempfile" +version = "3.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +dependencies = [ + "fastrand", + "getrandom 0.3.3", + "once_cell", + "rustix", + "windows-sys 0.60.2", +] + [[package]] name = "thiserror" version = "1.0.69" diff --git a/libs/pillar_core/Cargo.toml b/libs/pillar_core/Cargo.toml index 9105705..4be1f45 100644 --- a/libs/pillar_core/Cargo.toml +++ b/libs/pillar_core/Cargo.toml @@ -24,3 +24,4 @@ bytemuck = {version="1.23.2", features=["derive"]} bytes = "1.10.1" byteorder = "1.5.0" hex = "0.4.3" +tempfile = "3.23.0" diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 302a103..a53bead 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -1,17 +1,25 @@ -use std::{collections::{HashMap, HashSet}, path::PathBuf}; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, +}; use bytemuck::{Pod, Zeroable}; use pillar_crypto::{hashing::Hashable, types::StdByteArray}; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; -use crate::{accounting::state::StateManager, blockchain::chain::Chain, primitives::block::{Block, BlockHeader}}; +use crate::{ + accounting::state::StateManager, + blockchain::chain::Chain, + primitives::block::{Block, BlockHeader}, +}; pub mod database; pub(crate) trait Persistable - where Self: Sized + PillarSerialize +where + Self: Sized + PillarSerialize, { - async fn save(&self, path: &PathBuf) -> Result<(), std::io::Error>{ + async fn save(&self, path: &PathBuf) -> Result<(), std::io::Error> { let bytes = self.serialize_pillar()?; tokio::fs::write(path, bytes).await } @@ -23,20 +31,22 @@ pub(crate) trait Persistable impl Persistable for Vec where V: PillarSerialize {} -impl Persistable for Block{} // we are using the default implementations, which is just serialize and save to a path +impl Persistable for Block {} // we are using the default implementations, which is just serialize and save to a path impl Persistable for StateManager {} #[inline] -fn load_blocks_from_dir(path: &PathBuf) -> Result, std::io::Error>{ +fn load_blocks_from_dir(path: &PathBuf) -> Result, std::io::Error> { let block_files = std::fs::read_dir(path)?; let mut blocks: HashMap = HashMap::new(); for entry in block_files { let entry = entry?; - if entry.file_type()?.is_file() { - let hash: StdByteArray = hex::decode(entry.file_name().to_str().unwrap()) + if entry.file_type()?.is_file() && entry.file_name().len() == 68 { + let hash: StdByteArray = hex::decode(entry.file_name().to_string_lossy().split(".").next().unwrap()) .unwrap() .try_into() - .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename"))?; + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename") + })?; let block = Block::deserialize_pillar(&std::fs::read(entry.path())?)?; blocks.insert(hash, block); } @@ -45,7 +55,9 @@ fn load_blocks_from_dir(path: &PathBuf) -> Result, } #[inline] -fn get_headers_from_blocks(blocks: &HashMap) -> HashMap { +fn get_headers_from_blocks( + blocks: &HashMap, +) -> HashMap { let mut headers = HashMap::new(); for (hash, block) in blocks { headers.insert(*hash, block.header.clone()); @@ -72,13 +84,17 @@ impl Persistable for ChainMeta {} // chains serialization is to save one block per file impl Persistable for Chain { async fn save(&self, path: &std::path::PathBuf) -> Result<(), std::io::Error> { - if !std::fs::metadata(path)?.is_dir(){ - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path must be a directory")); + if !std::fs::metadata(path)?.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Path must be a directory", + )); } for (block_hash, block) in &self.blocks { + // println!("{}", hex::encode(block_hash)); let block_path = path.join(format!("{}.bin", hex::encode(block_hash))); block.save(&block_path).await?; - }; + } let metadata = ChainMeta { deepest_hash: self.deepest_hash, depth: self.depth, @@ -89,20 +105,29 @@ impl Persistable for Chain { let leaves_path = path.join("leaves.bin"); metadata.save(&meta_path).await?; // TODO this clone sus af (implment serialization for reference types) - self.leaves.iter().cloned().collect::>().save(&leaves_path).await?; + self.leaves + .iter() + .cloned() + .collect::>() + .save(&leaves_path) + .await?; self.state_manager.save(&state_manager_path).await?; Ok(()) } async fn load(path: &std::path::PathBuf) -> Result { - if !std::fs::metadata(path)?.is_dir(){ - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path must be a directory")); + if !std::fs::metadata(path)?.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Path must be a directory", + )); } let blocks = load_blocks_from_dir(path)?; let headers = get_headers_from_blocks(&blocks); let metadata = ChainMeta::load(&path.join("meta.bin")).await?; let state_manager = StateManager::load(&path.join("state.bin")).await?; - let leaves = Vec::::load(&path.join("leaves.bin")).await? + let leaves = Vec::::load(&path.join("leaves.bin")) + .await? .iter() .cloned() .collect::>(); @@ -117,4 +142,155 @@ impl Persistable for Chain { }; Ok(chain) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::path::PathBuf; + + use pillar_crypto::hashing::DefaultHash; + use pillar_crypto::signing::{DefaultSigner, SigFunction, SigVerFunction, Signable}; + + use crate::blockchain::chain::Chain; + use crate::persistence::Persistable; + use crate::primitives::block::{Block, BlockTail}; + use crate::primitives::transaction::Transaction; + use crate::protocol::pow::mine; + + #[tokio::test] + async fn test_persist_block() { + let t = Transaction::new([1; 32], [2; 32], 0, 0, 0, &mut DefaultHash::new()); + let block = Block::new( + [0; 32], + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + vec![t], + None, + BlockTail::default().stamps, + 0, + None, + None, + &mut DefaultHash::new(), + ); + let file = tempfile::NamedTempFile::new().unwrap(); + let path = PathBuf::from(file.path()); + block.save(&path).await.unwrap(); + + let loaded_block = Block::load(&path).await.unwrap(); + assert_eq!(block, loaded_block); + } + + #[tokio::test] + async fn test_chain_persistence() { + let mut chain = Chain::new_with_genesis(); + let mut signing_key = DefaultSigner::generate_random(); + let sender = signing_key.get_verifying_function().to_bytes(); + + let mut main_hash = chain.deepest_hash; + let genesis_hash = main_hash; + + // Extend the main chain to depth 10 + for depth in 1..=15 { + let mut transaction = + Transaction::new(sender, [2; 32], 0, 0, depth - 1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut block = Block::new( + main_hash, + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + depth, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain + .headers + .get(&block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = + chain + .state_manager + .branch_from_block_internal(&block, prev_header, &sender); + mine( + &mut block, + sender, + state_root, + vec![], + None, + DefaultHash::new(), + ) + .await; + main_hash = block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(block).unwrap(); + } + + // Create a fork that shares some nodes with the main chain + let mut fork_hash = genesis_hash; + for depth in 1..=3 { + let mut transaction = + Transaction::new(sender, [2; 32], 0, 0, depth - 1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut fork_block = Block::new( + fork_hash, + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + depth + + 15, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain + .headers + .get(&fork_block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = + chain + .state_manager + .branch_from_block_internal(&fork_block, prev_header, &sender); + mine( + &mut fork_block, + sender, + state_root, + vec![], + None, + DefaultHash::new(), + ) + .await; + fork_hash = fork_block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(fork_block).unwrap(); + } + + // write the chain, then get a new one + + let temp_dir = tempfile::tempdir().unwrap(); + let path = PathBuf::from(temp_dir.path()); + chain.save(&path).await.unwrap(); + + let new_chain = Chain::load(&path).await.unwrap(); + + assert_eq!(new_chain.depth, chain.depth); + assert_eq!(new_chain.leaves, chain.leaves); + assert_eq!(new_chain.blocks.len(), chain.blocks.len()); + assert_eq!(new_chain.headers.len(), chain.headers.len()); + assert_eq!(new_chain.deepest_hash, chain.deepest_hash); + } +} diff --git a/libs/pillar_core/src/protocol/serialization.rs b/libs/pillar_core/src/protocol/serialization.rs index b131843..e456341 100644 --- a/libs/pillar_core/src/protocol/serialization.rs +++ b/libs/pillar_core/src/protocol/serialization.rs @@ -168,10 +168,10 @@ impl PillarSerialize for crate::primitives::messages::Message { impl PillarSerialize for StateManager { fn serialize_pillar(&self) -> Result, std::io::Error> { let mut bytes = vec![]; - let reputation_bytes = self.reputations.serialize_pillar()?; - bytes.extend((reputation_bytes.len() as u32).to_le_bytes()); - bytes.extend(reputation_bytes); - + let trie_bytes = self.state_trie.serialize_pillar()?; + bytes.extend((trie_bytes.len() as u32).to_le_bytes()); + bytes.extend(trie_bytes); + bytes.extend(self.reputations.serialize_pillar()?); Ok(bytes) } diff --git a/libs/pillar_serialize/src/lib.rs b/libs/pillar_serialize/src/lib.rs index 91a0160..d1a3eca 100644 --- a/libs/pillar_serialize/src/lib.rs +++ b/libs/pillar_serialize/src/lib.rs @@ -252,7 +252,7 @@ where impl PillarSerialize for HashMap where - V: PillarSerialize + V: PillarSerialize + PillarFixedSize { /// Specialized map encoding for 32-byte keys; value length is still prefixed. default fn serialize_pillar(&self) -> Result, std::io::Error> { @@ -288,7 +288,7 @@ where /// a common hashmap implmentation for address/hash to fixed size value impl PillarSerialize for HashMap where - V: PillarSerialize + PillarNativeEndian + Zeroable + Pod + V: PillarSerialize + PillarNativeEndian + Zeroable + Pod + PillarFixedSize { /// Tight map encoding for 32-byte keys and fixed-size values (no per-entry length). fn serialize_pillar(&self) -> Result, std::io::Error> { From f9b78fff7e7eb0c078fde046c8ed3441f35ef123 Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Mon, 13 Oct 2025 16:18:45 -0400 Subject: [PATCH 07/11] clippy --- libs/pillar_core/src/accounting/wallet.rs | 1 - libs/pillar_core/src/blockchain/chain.rs | 14 +++++--------- libs/pillar_core/src/nodes/node.rs | 8 ++++---- libs/pillar_core/src/persistence/database.rs | 12 ++++++++++++ libs/pillar_core/src/persistence/mod.rs | 4 ++-- .../src/primitives/implementations/block.rs | 2 +- .../src/primitives/implementations/tail.rs | 2 +- libs/pillar_core/src/primitives/pool.rs | 6 ++++++ libs/pillar_core/src/protocol/chain.rs | 4 ++-- libs/pillar_core/src/protocol/peers.rs | 2 +- libs/pillar_core/src/protocol/serialization.rs | 8 ++++---- 11 files changed, 38 insertions(+), 25 deletions(-) diff --git a/libs/pillar_core/src/accounting/wallet.rs b/libs/pillar_core/src/accounting/wallet.rs index dfc2bb4..f48544b 100644 --- a/libs/pillar_core/src/accounting/wallet.rs +++ b/libs/pillar_core/src/accounting/wallet.rs @@ -1,5 +1,4 @@ //! Wallet wrapper around an ed25519 keypair with convenience methods. -use bytemuck::{Pod, Zeroable}; use pillar_crypto::{signing::{DefaultSigner, DefaultVerifier, SigFunction, SigVerFunction, Signable}, types::{StdByteArray, STANDARD_ARRAY_LENGTH}}; use pillar_serialize::PillarSerialize; diff --git a/libs/pillar_core/src/blockchain/chain.rs b/libs/pillar_core/src/blockchain/chain.rs index 22e9368..172919e 100644 --- a/libs/pillar_core/src/blockchain/chain.rs +++ b/libs/pillar_core/src/blockchain/chain.rs @@ -5,7 +5,7 @@ use pillar_crypto::{hashing::DefaultHash, signing::{DefaultVerifier, SigVerFunct use tracing::instrument; use crate::{ - accounting::{account::Account, state::StateManager}, persistence::{ Persistable}, primitives::{block::{Block, BlockHeader}, errors::BlockValidationError, transaction::Transaction}, protocol::{chain::get_genesis_block, pow::get_difficulty_for_block, reputation::{get_current_reputations_for_stampers, get_current_reputations_for_stampers_from_state}} + accounting::{account::Account, state::StateManager}, primitives::{block::{Block, BlockHeader}, errors::BlockValidationError, transaction::Transaction}, protocol::{chain::get_genesis_block, pow::get_difficulty_for_block, reputation::{get_current_reputations_for_stampers, get_current_reputations_for_stampers_from_state}} }; use super::TrimmableChain; @@ -305,13 +305,9 @@ impl Chain { self.headers.get(&new_block.header.previous_hash).unwrap(), &new_block.header ).values().sum(); - if new_reputation > existing_reputation || ( + new_reputation > existing_reputation || ( new_reputation == existing_reputation && new_block.header.tail.n_stamps() > current_deepest.header.tail.n_stamps() - ){ - true - } else { - false - } + ) } else { // self.depth < new_block.header.depth true } @@ -381,7 +377,7 @@ impl Chain { let deepest = &self.deepest_hash; let mut stack: Vec = self.leaves.iter().filter(|x| { - let b = self.get_block(*x).unwrap(); + let b = self.get_block(x).unwrap(); x != &deepest && b.header.depth >= min_depth.unwrap_or(0) }).cloned().collect(); stack.push(*deepest); @@ -404,7 +400,7 @@ impl Chain { } stack.push(b.header.previous_hash); // this block depth is 1 greater than previous - if b.header.depth+1 <= max_depth.unwrap_or(u64::MAX) { + if b.header.depth < max_depth.unwrap_or(u64::MAX) { result.push(block); } diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index 3bde468..1309e2b 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -61,9 +61,9 @@ impl NodeState { } } -impl Into for NodeState { - fn into(self) -> String { - match self { +impl From for String { + fn from(val: NodeState) -> Self { + match val { NodeState::ChainLoading => "ChainLoading", NodeState::ChainOutdated => "ChainOutdated", NodeState::ChainSyncing => "ChainSyncing", @@ -243,7 +243,7 @@ impl Node { Some(handle) }, _ => { - panic!("Unexpected node state for initialization: {:?}", state); + panic!("Unexpected node state for initialization: {state:?}"); } }; if let Some(handle) = handle { diff --git a/libs/pillar_core/src/persistence/database.rs b/libs/pillar_core/src/persistence/database.rs index 6cb1ed1..d1cc040 100644 --- a/libs/pillar_core/src/persistence/database.rs +++ b/libs/pillar_core/src/persistence/database.rs @@ -39,6 +39,12 @@ pub struct GenesisDatastore{ chain: Chain } +impl Default for GenesisDatastore { + fn default() -> Self { + Self::new() + } +} + impl GenesisDatastore { pub fn new() -> Self { GenesisDatastore { @@ -79,6 +85,12 @@ pub struct EmptyDatastore{ chain: Option } +impl Default for EmptyDatastore { + fn default() -> Self { + Self::new() + } +} + impl EmptyDatastore { pub fn new() -> Self { EmptyDatastore { diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index a53bead..0a3c6d7 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -4,7 +4,7 @@ use std::{ }; use bytemuck::{Pod, Zeroable}; -use pillar_crypto::{hashing::Hashable, types::StdByteArray}; +use pillar_crypto::types::StdByteArray; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; use crate::{ @@ -60,7 +60,7 @@ fn get_headers_from_blocks( ) -> HashMap { let mut headers = HashMap::new(); for (hash, block) in blocks { - headers.insert(*hash, block.header.clone()); + headers.insert(*hash, block.header); } headers } diff --git a/libs/pillar_core/src/primitives/implementations/block.rs b/libs/pillar_core/src/primitives/implementations/block.rs index 0ccef50..0bdf8ca 100644 --- a/libs/pillar_core/src/primitives/implementations/block.rs +++ b/libs/pillar_core/src/primitives/implementations/block.rs @@ -1,7 +1,7 @@ use pillar_crypto::{hashing::{DefaultHash, HashFunction}, merkle::{generate_tree, MerkleTree}, proofs::{generate_proof_of_inclusion, verify_proof_of_inclusion, MerkleProof}, types::StdByteArray}; -use crate::{persistence::Persistable, primitives::{block::{Block, BlockHeader, BlockTail, Stamp}, transaction::Transaction}, protocol::reputation::N_TRANSMISSION_SIGNATURES}; +use crate::{primitives::{block::{Block, BlockHeader, BlockTail, Stamp}, transaction::Transaction}, protocol::reputation::N_TRANSMISSION_SIGNATURES}; impl Block { /// Create a new block diff --git a/libs/pillar_core/src/primitives/implementations/tail.rs b/libs/pillar_core/src/primitives/implementations/tail.rs index 008f914..8eaf387 100644 --- a/libs/pillar_core/src/primitives/implementations/tail.rs +++ b/libs/pillar_core/src/primitives/implementations/tail.rs @@ -1,4 +1,4 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use pillar_crypto::{hashing::Hashable, signing::{DefaultVerifier, SigVerFunction, Signable}, types::StdByteArray}; diff --git a/libs/pillar_core/src/primitives/pool.rs b/libs/pillar_core/src/primitives/pool.rs index aeab9f1..342cb90 100644 --- a/libs/pillar_core/src/primitives/pool.rs +++ b/libs/pillar_core/src/primitives/pool.rs @@ -21,6 +21,12 @@ pub struct MinerPool { /// Transaction pool for now is just a vector of transactions /// In the future, it will be a more complex structure - perhaps a max heap on the transaction fee /// Rn, FIFO +impl Default for MinerPool { + fn default() -> Self { + Self::new() + } +} + impl MinerPool{ pub fn new() -> Self { let (mine_abort_sender, mine_abort_receiver) = flume::unbounded(); diff --git a/libs/pillar_core/src/protocol/chain.rs b/libs/pillar_core/src/protocol/chain.rs index a3a6f09..c918712 100644 --- a/libs/pillar_core/src/protocol/chain.rs +++ b/libs/pillar_core/src/protocol/chain.rs @@ -332,9 +332,9 @@ pub async fn block_settle_consumer(node: Node, stop_signal: Option { Ok(peer) } - _ => Err(std::io::Error::other(format!("Invalid message received: {:?}", response))), + _ => Err(std::io::Error::other(format!("Invalid message received: {response:?}"))), } } diff --git a/libs/pillar_core/src/protocol/serialization.rs b/libs/pillar_core/src/protocol/serialization.rs index e456341..67590b9 100644 --- a/libs/pillar_core/src/protocol/serialization.rs +++ b/libs/pillar_core/src/protocol/serialization.rs @@ -423,21 +423,21 @@ impl PillarSerialize for Account { } mod tests { - use pillar_crypto::hashing::{DefaultHash, Hashable}; + use pillar_crypto::hashing::DefaultHash; use pillar_serialize::PillarSerialize; use crate::{ primitives::{ - block::{Block, BlockHeader, Stamp}, + block::{Block, Stamp}, messages::Message, transaction::{Transaction, TransactionFilter}, }, - protocol::{reputation::N_TRANSMISSION_SIGNATURES, versions::Versions}, + protocol::reputation::N_TRANSMISSION_SIGNATURES, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::Peer, accounting::account::TransactionStub, }; - use pillar_crypto::{types::StdByteArray, proofs::{MerkleProof, HashDirection}}; + use pillar_crypto::proofs::{MerkleProof, HashDirection}; use std::net::{IpAddr, Ipv4Addr}; #[test] From 414bf8cebfee97072f405f442a60dd5c822ada2f Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Mon, 13 Oct 2025 16:20:20 -0400 Subject: [PATCH 08/11] clippy fix fix --- libs/pillar_core/src/protocol/chain.rs | 4 ++++ libs/pillar_core/src/protocol/serialization.rs | 12 ++++-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/libs/pillar_core/src/protocol/chain.rs b/libs/pillar_core/src/protocol/chain.rs index c918712..c909f55 100644 --- a/libs/pillar_core/src/protocol/chain.rs +++ b/libs/pillar_core/src/protocol/chain.rs @@ -332,6 +332,10 @@ pub async fn block_settle_consumer(node: Node, stop_signal: Option Date: Mon, 13 Oct 2025 18:34:53 -0400 Subject: [PATCH 09/11] Introduce startup modes, which eliminates datastore objects. Saving will be functional and explicit --- libs/pillar_core/src/nodes/miner.rs | 4 +- libs/pillar_core/src/nodes/mod.rs | 9 +- libs/pillar_core/src/nodes/node.rs | 45 ++--- libs/pillar_core/src/persistence/database.rs | 190 ------------------ libs/pillar_core/src/persistence/mod.rs | 2 - .../pillar_core/src/protocol/communication.rs | 11 +- libs/pillar_core/src/protocol/peers.rs | 3 +- pillar/src/run.rs | 4 +- 8 files changed, 35 insertions(+), 233 deletions(-) delete mode 100644 libs/pillar_core/src/persistence/database.rs diff --git a/libs/pillar_core/src/nodes/miner.rs b/libs/pillar_core/src/nodes/miner.rs index 7c59565..91da347 100644 --- a/libs/pillar_core/src/nodes/miner.rs +++ b/libs/pillar_core/src/nodes/miner.rs @@ -156,7 +156,7 @@ mod test{ use pillar_crypto::hashing::DefaultHash; - use crate::{persistence::database::GenesisDatastore, primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; + use crate::{nodes::node::StartupModes, primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; use crate::nodes::miner::Miner; use super::Node; @@ -166,7 +166,7 @@ mod test{ let private_key = [2u8; 32]; let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8080; - let node = Node::new(public_key, private_key, ip_address, port, vec![], Some(Arc::new(GenesisDatastore::new())), Some(MinerPool::new())); + let node = Node::new(public_key, private_key, ip_address, port, vec![], StartupModes::Genesis, Some(MinerPool::new())); let miner = Miner::new(node).unwrap(); let mut hasher = DefaultHash::new(); diff --git a/libs/pillar_core/src/nodes/mod.rs b/libs/pillar_core/src/nodes/mod.rs index f5dc8b8..4053974 100644 --- a/libs/pillar_core/src/nodes/mod.rs +++ b/libs/pillar_core/src/nodes/mod.rs @@ -19,8 +19,8 @@ mod tests { use crate::{ accounting::{account, wallet::Wallet}, nodes::{ - miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState}, peer::Peer - }, persistence::database::GenesisDatastore, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} + miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState, StartupModes}, peer::Peer + }, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} }; use super::node::Node; @@ -100,7 +100,7 @@ mod tests { ip_address, port, peers.clone(), - if genesis_store {Some(Arc::new(GenesisDatastore::new()))} else {None}, + if genesis_store {StartupModes::Genesis} else {StartupModes::Empty}, miner_pool.clone(), ); @@ -129,7 +129,6 @@ mod tests { let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); let port = 8070; let peers = vec![]; - let datastore = GenesisDatastore::new(); let transaction_pool = None; let mut node = Node::new( @@ -138,7 +137,7 @@ mod tests { ip_address, port, peers, - Some(Arc::new(datastore)), + StartupModes::Genesis, transaction_pool, ); diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index 1309e2b..0b9ac07 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -2,12 +2,11 @@ use super::peer::Peer; use flume::{Receiver, Sender}; use pillar_crypto::{hashing::{DefaultHash, Hashable}, signing::{DefaultSigner, SigFunction, Signable}, types::StdByteArray}; use tracing::instrument; -use std::{collections::{HashMap, HashSet}, net::IpAddr, sync::Arc}; +use std::{collections::{HashMap, HashSet}, net::IpAddr, path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, RwLock}; use crate::{ blockchain::chain::Chain, - persistence::database::{Datastore, EmptyDatastore}, primitives::{block::{Block, BlockHeader, Stamp}, messages::Message, pool::MinerPool, transaction::{FilterMatch, TransactionFilter}}, protocol::{chain::{block_settle_consumer, discover_chain, service_sync, sync_chain}, communication::{broadcast_knowledge, serve_peers}, @@ -75,23 +74,18 @@ impl From for String { } } -fn get_initial_state(datastore: &dyn Datastore) -> (NodeState, Option) { +fn get_initial_state(startup_mode: &StartupModes) -> (NodeState, Option) { // check if the datastore has a chain - if datastore.latest_chain().is_some() { - // if it does, load the chain - match datastore.load_chain() { - Ok(chain) => { - // assign the chain to the node - (NodeState::ChainOutdated, Some(chain)) - }, - Err(_) => { - // if it fails, we are in discovery mode - (NodeState::ICD, None) - } + match startup_mode { + StartupModes::Empty => (NodeState::ICD, None), + StartupModes::Load(path) => { + // if it does, load the chain + todo!(); + }, + StartupModes::Genesis => { + // if it does not, we are in discovery mode + (NodeState::ChainOutdated, Some(Chain::new_with_genesis())) } - } else { - // if it does not, we are in discovery mode - (NodeState::ICD, None) } } @@ -110,8 +104,6 @@ pub struct NodeInner { pub broadcasted_already: RwLock>, // transaction filter queue pub transaction_filters: Mutex>, - /// the datastore - pub datastore: Option>, /// A queue of blocks which are to be settled to the chain pub late_settle_queue: lfqueue::UnboundedQueue, /// the state represents the nodes ability to communicate with other nodes @@ -136,6 +128,12 @@ pub struct Node { kill_settle: Option>, } +pub enum StartupModes { + Load(PathBuf), + Genesis, + Empty, +} + impl Node { @@ -155,13 +153,9 @@ impl Node { ip_address: IpAddr, port: u16, peers: Vec, - mut database: Option>, + startup_mode: StartupModes, transaction_pool: Option, ) -> Self { - if database.is_none() { - tracing::warn!("Node created without a database. This will not persist the chain or transactions."); - database = Some(Arc::new(EmptyDatastore::new())); - } let broadcast_queue = lfqueue::UnboundedQueue::new(); let late_settle_queue = lfqueue::UnboundedQueue::new(); let transaction_filters = Mutex::new(Vec::new()); @@ -171,7 +165,7 @@ impl Node { .map(|peer| (peer.public_key, *peer)) .collect::>(); tracing::info!("Node created with {} initial peers", peer_map.len()); - let (state, maybe_chain) = get_initial_state(&**database.as_ref().unwrap()); + let (state, maybe_chain) = get_initial_state(&startup_mode); tracing::debug!("Node initial state: {:?}", state); Node { inner: NodeInner { @@ -185,7 +179,6 @@ impl Node { filter_callbacks: Mutex::new(HashMap::new()), // initially no callbacks state: RwLock::new(state), // initially in discovery mode late_settle_queue, - datastore: database, }.into(), ip_address, port, diff --git a/libs/pillar_core/src/persistence/database.rs b/libs/pillar_core/src/persistence/database.rs deleted file mode 100644 index d1cc040..0000000 --- a/libs/pillar_core/src/persistence/database.rs +++ /dev/null @@ -1,190 +0,0 @@ -use std::collections::HashMap; - - -use pillar_crypto::types::StdByteArray; - -use crate::{blockchain::chain::Chain, primitives::block::Block}; - -pub trait Datastore: Send + Sync { - /// If a chain exists on disk. - /// - /// Returns `Some(chain_timestamp)` if the chain exists, or `None` if it does not. - fn latest_chain(&self) -> Option; - - /// Loads chain from disk. - /// - /// Returns a `Chain` if it exists, or an error if it does not. - fn load_chain(&self) -> Result; - - /// Saves a chain to disk. - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error>; - - /// Saves a block to disk. - /// - /// Returns `Ok(())` if the block was saved successfully, or an error if it was not. - fn save_block(&self, block: Block) -> Result<(), std::io::Error>; - - /// Loads a block from disk. - fn load_block(&self, block_hash: &str) -> Result; - - /// sync the on disk state with a new chain - /// This will write/remove as needed to ensure the on disk state matches the chain. - fn sync_chain(&self, chain: Chain) -> Result<(), std::io::Error>; - -} - -/// The most basic datastore that is essentially memory based without any persistence. -#[derive(Clone)] -pub struct GenesisDatastore{ - chain: Chain -} - -impl Default for GenesisDatastore { - fn default() -> Self { - Self::new() - } -} - -impl GenesisDatastore { - pub fn new() -> Self { - GenesisDatastore { - chain: Chain::new_with_genesis(), - } - } -} - -impl Datastore for GenesisDatastore { - fn latest_chain(&self) -> Option { - self.chain.leaves.iter().last().map(|leaf| self.chain.blocks.get(leaf).unwrap().header.timestamp) - } - - fn load_chain(&self) -> Result { - Ok(self.chain.clone()) - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - self.chain = chain; - Ok(()) - } - - fn save_block(&self, _block: Block) -> Result<(), std::io::Error> { - Ok(()) - } - - fn load_block(&self, _block_hash: &str) -> Result { - unimplemented!() - } - - fn sync_chain(&self, _chain: Chain) -> Result<(), std::io::Error> { - unimplemented!() - } -} - -/// This datastore never provides any chain, but it can store. -pub struct EmptyDatastore{ - chain: Option -} - -impl Default for EmptyDatastore { - fn default() -> Self { - Self::new() - } -} - -impl EmptyDatastore { - pub fn new() -> Self { - EmptyDatastore { - chain: None, - } - } -} - -impl Datastore for EmptyDatastore { - fn latest_chain(&self) -> Option { - match &self.chain{ - Some(chain) => chain.leaves.iter().last().map(|leaf| chain.blocks.get(leaf).unwrap().header.timestamp), - None => None, - } - } - - fn load_chain(&self) -> Result { - match &self.chain { - Some(chain) => Ok(chain.clone()), - None => Err(std::io::Error::new(std::io::ErrorKind::NotFound, "No chain found")), - } - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - self.chain = Some(chain); - Ok(()) - } - - fn save_block(&self, _block: Block) -> Result<(), std::io::Error> { - Ok(()) - } - - fn load_block(&self, _block_hash: &str) -> Result { - unimplemented!() - } - - fn sync_chain(&self, _chain: Chain) -> Result<(), std::io::Error> { - unimplemented!() - } -} - - - - - -pub struct SledDatastore { - data: sled::Db, -} - -impl SledDatastore { - pub fn new(address: String) -> Self { - SledDatastore { - data: sled::open(address).expect("Failed to open sled database"), - } - } -} - -impl Datastore for SledDatastore { - fn latest_chain(&self) -> Option { - let timestamp = self.data.get("latest_timestamp"); - match timestamp { - Ok(Some(value)) => { - let timestamp = u64::from_le_bytes( - value.as_ref().try_into().expect("Failed to convert timestamp to u64") - ); - timestamp.into() - }, - _ => None, - } - } - - fn load_chain(&self) -> Result { - let leaf_hashes = self.data.get("leaf_hashes") - .map_err(std::io::Error::other)?; - // the leaf hash's will point off to the respective blocks. then, we will work our way backwards, loading each block. - // from there, we reconstruct the chain - let mut blocks: HashMap; - todo!(); - - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - todo!() - } - - fn save_block(&self, block: Block) -> Result<(), std::io::Error> { - todo!() - } - - fn load_block(&self, block_hash: &str) -> Result { - todo!() - } - - fn sync_chain(&self, chain: Chain) -> Result<(), std::io::Error> { - todo!() - } -} \ No newline at end of file diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 0a3c6d7..bd0d5c1 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -13,8 +13,6 @@ use crate::{ primitives::block::{Block, BlockHeader}, }; -pub mod database; - pub(crate) trait Persistable where Self: Sized + PillarSerialize, diff --git a/libs/pillar_core/src/protocol/communication.rs b/libs/pillar_core/src/protocol/communication.rs index 7f51812..d487996 100644 --- a/libs/pillar_core/src/protocol/communication.rs +++ b/libs/pillar_core/src/protocol/communication.rs @@ -171,6 +171,7 @@ mod tests { use super::*; use pillar_serialize::PillarSerialize; use tokio::net::TcpStream; + use crate::nodes::node::StartupModes; use crate::nodes::peer::Peer; use crate::protocol::serialization::{package_standard_message}; use crate::{ @@ -184,7 +185,7 @@ mod tests { async fn test_peer_declaration() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8084; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], None, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); tokio::spawn(serve_peers(node.clone(), None)); @@ -214,7 +215,7 @@ mod tests { async fn test_message_broadcast() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8080; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], None, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); tokio::spawn(serve_peers(node.clone(), None)); tokio::spawn(broadcast_knowledge(node.clone(), None)); @@ -276,7 +277,7 @@ mod tests { async fn test_error_response() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8090; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], None, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); tokio::spawn(serve_peers(node,None)); @@ -318,7 +319,7 @@ mod tests { async fn test_timeout_broadcast(){ let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.9").unwrap()); let port = 8091; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], None, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); let (sender, stop_signal) = flume::bounded(1); // Create a stop signal receiver let handle = tokio::spawn(broadcast_knowledge(node.clone(), Some(stop_signal.clone()))); @@ -340,7 +341,7 @@ mod tests { async fn test_timeout_serve(){ let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.9").unwrap()); let port = 8091; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], None, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); let (sender, stop_signal) = flume::bounded(1); // Create a stop signal receiver let handle = tokio::spawn(serve_peers(node.clone(), Some(stop_signal.clone()))); diff --git a/libs/pillar_core/src/protocol/peers.rs b/libs/pillar_core/src/protocol/peers.rs index 95ed8ec..3dbd608 100644 --- a/libs/pillar_core/src/protocol/peers.rs +++ b/libs/pillar_core/src/protocol/peers.rs @@ -61,6 +61,7 @@ mod tests { use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::*; + use crate::nodes::node::StartupModes; use crate::nodes::{node::Node, peer::Peer}; use crate::protocol::serialization::{package_standard_message, read_standard_message}; use std::net::{IpAddr, Ipv4Addr}; @@ -84,7 +85,7 @@ mod tests { IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()), 8080, vec![existing_peer], - None, + StartupModes::Empty, None, ); diff --git a/pillar/src/run.rs b/pillar/src/run.rs index 827077e..831131b 100644 --- a/pillar/src/run.rs +++ b/pillar/src/run.rs @@ -1,7 +1,7 @@ use std::sync::{Arc}; use axum::{extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse, routing::{get, post}, Json, Router}; -use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::Node, peer::Peer}, persistence::database::GenesisDatastore, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; +use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::{Node, StartupModes}, peer::Peer}, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; use pillar_crypto::types::StdByteArray; use serde::{Deserialize, Serialize}; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; @@ -508,7 +508,7 @@ pub async fn launch_node(config: Config, genesis: bool, miner: bool) { config.ip_address, pillar_core::PROTOCOL_PORT, vec![], - if genesis { Some(Arc::new(GenesisDatastore::new())) } else { None }, + if genesis { StartupModes::Genesis } else { StartupModes::Empty }, if miner { Some(MinerPool::new()) } else { None } ); From 6b219336b8457dd6be128792b813bb5184b973dc Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sat, 18 Oct 2025 13:54:14 -0400 Subject: [PATCH 10/11] nodes can be created from persistence --- Cargo.lock | 49 ++++++++++ libs/pillar_core/Cargo.toml | 1 + libs/pillar_core/src/nodes/miner.rs | 4 +- libs/pillar_core/src/nodes/mod.rs | 59 ++---------- libs/pillar_core/src/nodes/node.rs | 76 ++++++++------- libs/pillar_core/src/persistence/manager.rs | 96 +++++++++++++++++++ libs/pillar_core/src/persistence/mod.rs | 17 +++- .../pillar_core/src/protocol/communication.rs | 11 +-- libs/pillar_core/src/protocol/peers.rs | 4 +- .../pillar_core/src/protocol/serialization.rs | 7 +- pillar/src/run.rs | 5 +- 11 files changed, 222 insertions(+), 107 deletions(-) create mode 100644 libs/pillar_core/src/persistence/manager.rs diff --git a/Cargo.lock b/Cargo.lock index cc4361f..718901d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,6 +483,27 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.60.2", +] + [[package]] name = "dtor" version = "0.0.6" @@ -902,6 +923,16 @@ version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags 2.9.1", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1047,6 +1078,12 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1132,6 +1169,7 @@ dependencies = [ "bytes", "chrono", "ctor", + "dirs", "flume", "futures-util", "hex", @@ -1291,6 +1329,17 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.16", + "libredox", + "thiserror 2.0.17", +] + [[package]] name = "regex-automata" version = "0.4.9" diff --git a/libs/pillar_core/Cargo.toml b/libs/pillar_core/Cargo.toml index 4be1f45..8fdbae5 100644 --- a/libs/pillar_core/Cargo.toml +++ b/libs/pillar_core/Cargo.toml @@ -25,3 +25,4 @@ bytes = "1.10.1" byteorder = "1.5.0" hex = "0.4.3" tempfile = "3.23.0" +dirs = "6.0.0" diff --git a/libs/pillar_core/src/nodes/miner.rs b/libs/pillar_core/src/nodes/miner.rs index 91da347..32fd699 100644 --- a/libs/pillar_core/src/nodes/miner.rs +++ b/libs/pillar_core/src/nodes/miner.rs @@ -156,7 +156,7 @@ mod test{ use pillar_crypto::hashing::DefaultHash; - use crate::{nodes::node::StartupModes, primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; + use crate::{primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; use crate::nodes::miner::Miner; use super::Node; @@ -166,7 +166,7 @@ mod test{ let private_key = [2u8; 32]; let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8080; - let node = Node::new(public_key, private_key, ip_address, port, vec![], StartupModes::Genesis, Some(MinerPool::new())); + let node = Node::new(public_key, private_key, ip_address, port, vec![], true); let miner = Miner::new(node).unwrap(); let mut hasher = DefaultHash::new(); diff --git a/libs/pillar_core/src/nodes/mod.rs b/libs/pillar_core/src/nodes/mod.rs index 4053974..508e5d7 100644 --- a/libs/pillar_core/src/nodes/mod.rs +++ b/libs/pillar_core/src/nodes/mod.rs @@ -19,7 +19,7 @@ mod tests { use crate::{ accounting::{account, wallet::Wallet}, nodes::{ - miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState, StartupModes}, peer::Peer + miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState}, peer::Peer }, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} }; @@ -88,8 +88,7 @@ mod tests { ip_address: IpAddr, port: u16, peers: Vec, - genesis_store: bool, - miner_pool: Option, + genesis: bool ) -> (Node, Wallet){ let wallet = Wallet::generate_random(); @@ -100,8 +99,7 @@ mod tests { ip_address, port, peers.clone(), - if genesis_store {StartupModes::Genesis} else {StartupModes::Empty}, - miner_pool.clone(), + genesis, ); assert_eq!(node.inner.public_key, wallet.address); @@ -109,11 +107,10 @@ mod tests { assert_eq!(node.ip_address, ip_address); assert_eq!(node.port, port); assert_eq!(node.inner.peers.read().await.len(), peers.len()); - assert_eq!(node.inner.chain.lock().await.is_some(), genesis_store); - assert_eq!(node.miner_pool.is_some(), miner_pool.is_some()); + assert_eq!(node.inner.chain.lock().await.is_some(), genesis); assert!(node.inner.transaction_filters.lock().await.is_empty()); assert!(node.inner.filter_callbacks.lock().await.is_empty()); - if genesis_store{ + if genesis{ assert_eq!(node.inner.state.read().await.clone(), NodeState::ChainOutdated); }else{ assert_eq!(node.inner.state.read().await.clone(), NodeState::ICD); @@ -129,7 +126,6 @@ mod tests { let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); let port = 8070; let peers = vec![]; - let transaction_pool = None; let mut node = Node::new( public_key, @@ -137,8 +133,7 @@ mod tests { ip_address, port, peers, - StartupModes::Genesis, - transaction_pool, + true ); node.serve().await; @@ -169,7 +164,6 @@ mod tests { port, peers, true, - None, ) .await; @@ -187,7 +181,6 @@ mod tests { 8070, vec![node1.clone().into()], true, - None, ) .await; @@ -242,7 +235,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -251,7 +243,6 @@ mod tests { port_d, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], true, - None, ) .await; @@ -260,7 +251,6 @@ mod tests { port_c, vec![Peer::new(wallet_d.address, ip_address_d, port_d)], true, - None, ) .await; @@ -270,7 +260,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - None, ) .await; @@ -363,7 +352,6 @@ mod tests { port_b, vec![], true, - None, ) .await; @@ -372,7 +360,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -704,7 +691,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -713,7 +699,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -740,7 +725,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), // we will mine from this node ).await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -748,7 +732,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ).await; inner_test_transaction_and_block_proposal( @@ -927,7 +910,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -936,7 +918,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -945,7 +926,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -977,7 +957,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -986,7 +965,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -995,7 +973,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -1072,7 +1049,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1081,7 +1057,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1090,7 +1065,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -1186,7 +1160,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1195,7 +1168,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1279,7 +1251,6 @@ mod tests { port_a, vec![], true, - Some(MinerPool::new()), ).await; let (mut node_b, _ ) = create_empty_node_genisis( @@ -1287,7 +1258,6 @@ mod tests { port_b, vec![node_a.clone().into()], true, - Some(MinerPool::new()), ).await; let mut miner_a = Miner::new(node_a.clone()).unwrap(); @@ -1345,7 +1315,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1354,7 +1323,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1619,7 +1587,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1627,7 +1594,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1664,7 +1630,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1672,7 +1637,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1730,7 +1694,6 @@ mod tests { port_c, vec![], true, - None, ).await; // create node b with miner pool let (mut node_b, wallet_b) = create_empty_node_genisis( @@ -1738,7 +1701,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - None, ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1746,7 +1708,6 @@ mod tests { port_a, vec![Peer::new(wallet_c.address, ip_address_c, port_c), Peer::new(wallet_b.address, ip_address_b, port_b)], true, - Some(MinerPool::new()), ) .await; @@ -1822,7 +1783,6 @@ mod tests { port_c, vec![], true, - None, ).await; // create node b with miner pool let (node_b, mut wallet_b) = create_empty_node_genisis( @@ -1830,7 +1790,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - Some(MinerPool::new()), ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -1838,7 +1797,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -1941,7 +1899,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -1949,7 +1906,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -1982,7 +1938,6 @@ mod tests { port_c, vec![], false, - None, ) .await; @@ -1991,7 +1946,6 @@ mod tests { port_b, vec![], true, - None, ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -1999,7 +1953,6 @@ mod tests { port_a, vec![], false, - None, ) .await; diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index 0b9ac07..60debb0 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -6,11 +6,9 @@ use std::{collections::{HashMap, HashSet}, net::IpAddr, path::PathBuf, sync::Arc use tokio::sync::{Mutex, RwLock}; use crate::{ - blockchain::chain::Chain, - primitives::{block::{Block, BlockHeader, Stamp}, messages::Message, pool::MinerPool, transaction::{FilterMatch, TransactionFilter}}, - protocol::{chain::{block_settle_consumer, discover_chain, service_sync, sync_chain}, + accounting::state::StateManager, blockchain::chain::Chain, persistence::{self, manager::PersistenceManager}, primitives::{block::{Block, BlockHeader, Stamp}, messages::Message, pool::MinerPool, transaction::{FilterMatch, TransactionFilter}}, protocol::{chain::{block_settle_consumer, discover_chain, service_sync, sync_chain}, communication::{broadcast_knowledge, serve_peers}, - reputation::{nth_percentile_peer, N_TRANSMISSION_SIGNATURES}}, + reputation::{nth_percentile_peer, N_TRANSMISSION_SIGNATURES}} }; /// Operational state of a node, which controls how it treats incoming data. @@ -74,21 +72,6 @@ impl From for String { } } -fn get_initial_state(startup_mode: &StartupModes) -> (NodeState, Option) { - // check if the datastore has a chain - match startup_mode { - StartupModes::Empty => (NodeState::ICD, None), - StartupModes::Load(path) => { - // if it does, load the chain - todo!(); - }, - StartupModes::Genesis => { - // if it does not, we are in discovery mode - (NodeState::ChainOutdated, Some(Chain::new_with_genesis())) - } - } -} - /// Shared inner state of a node (behind `Arc`). pub struct NodeInner { pub public_key: StdByteArray, @@ -128,14 +111,6 @@ pub struct Node { kill_settle: Option>, } -pub enum StartupModes { - Load(PathBuf), - Genesis, - Empty, -} - - - impl Node { /// Create a new node #[instrument( @@ -147,14 +122,47 @@ impl Node { port = port ) )] - pub fn new( + pub fn new<'a>( public_key: StdByteArray, private_key: StdByteArray, ip_address: IpAddr, port: u16, peers: Vec, - startup_mode: StartupModes, - transaction_pool: Option, + genesis: bool + ) -> Self { + let state = if genesis { NodeState::ChainOutdated } else { NodeState::ICD }; + let chain = if genesis { Some(Chain::new_with_genesis()) } else { None }; + tracing::debug!("Node initial state: {:?}", state); + Node::new_inner(public_key, private_key, ip_address, port, peers, chain, state) + } + + pub async fn new_load( + persistence_manager: &PersistenceManager, + ) -> Result{ + let chain = persistence_manager.load_chain().await?; + let node_state = persistence_manager.load_node().await?; + + let node = Node::new_inner( + node_state.public_key, + node_state.private_key, + node_state.ip_address, + node_state.port, + node_state.peers, + chain, + NodeState::ChainOutdated + ); + + Ok(node) + } + + fn new_inner( + public_key: StdByteArray, + private_key: StdByteArray, + ip_address: IpAddr, + port: u16, + peers: Vec, + chain: Option, + state: NodeState, ) -> Self { let broadcast_queue = lfqueue::UnboundedQueue::new(); let late_settle_queue = lfqueue::UnboundedQueue::new(); @@ -164,15 +172,13 @@ impl Node { .iter() .map(|peer| (peer.public_key, *peer)) .collect::>(); - tracing::info!("Node created with {} initial peers", peer_map.len()); - let (state, maybe_chain) = get_initial_state(&startup_mode); - tracing::debug!("Node initial state: {:?}", state); + Node { inner: NodeInner { public_key, private_key, peers: RwLock::new(peer_map), - chain: Mutex::new(maybe_chain), + chain: Mutex::new(chain), broadcasted_already, transaction_filters, broadcast_queue, @@ -182,7 +188,7 @@ impl Node { }.into(), ip_address, port, - miner_pool: transaction_pool, + miner_pool: Some(MinerPool::new()), kill_broadcast: None, kill_serve: None, kill_settle: None, diff --git a/libs/pillar_core/src/persistence/manager.rs b/libs/pillar_core/src/persistence/manager.rs new file mode 100644 index 0000000..7d6ed8b --- /dev/null +++ b/libs/pillar_core/src/persistence/manager.rs @@ -0,0 +1,96 @@ +use std::{net::IpAddr, path::PathBuf}; + +use pillar_crypto::types::StdByteArray; + +use crate::{blockchain::chain::Chain, nodes::{node::{self, Node}, peer::{Peer, PillarIPAddr}}, persistence::Persistable, PROTOCOL_PORT}; + + + +pub struct PersistenceManager { + pub root: PathBuf, + chain_root: PathBuf, + node_root: PathBuf, +} + +pub(crate) struct NodeState { + pub ip_address: IpAddr, + pub port: u16, + pub peers: Vec, + pub public_key: StdByteArray, + pub private_key: StdByteArray +} + +impl PersistenceManager { + /// Creates a new state manager working off + /// the root. + /// + /// If None, defaults to ~/.pillar + pub fn new(root: Option) -> Self { + let root = match root { + Some(a) => a, + None => { + tracing::info!("State setting up at ~/.pillar"); + let home = dirs::home_dir().expect("Cannot startup state without home dir if a path is not specified."); + let r = home.join(".pillar"); + if ! std::fs::exists(&r).unwrap() { + tracing::info!("Creating folder {}", r.display()); + std::fs::create_dir_all(&r).unwrap(); + } + r + } + }; + if !std::fs::exists(root.join("chain")).unwrap() { + std::fs::create_dir_all(root.join("chain")).unwrap(); + } + if !std::fs::exists(root.join("node")).unwrap() { + std::fs::create_dir_all(root.join("node")).unwrap(); + } + Self { + root: root.clone(), + chain_root: root.clone().join("chain"), + node_root: root.join("node"), + } + } + + pub(crate) async fn save_node(&self, node: &Node) -> Result<(), std::io::Error>{ + if let Some(chain) = node.inner.chain.lock().await.as_ref() { + chain.save(&self.chain_root).await?; + } + + let address: PillarIPAddr = node.ip_address.into(); + + node.inner.public_key.save(&self.root.join("public_key.bin")).await?; + node.inner.private_key.save(&self.root.join("private_key.bin")).await?; + node.inner.peers.read().await.save(&self.node_root.join("peers.bin")).await?; + address.save(&self.node_root.join("ip_address.bin")).await?; + node.port.save(&self.node_root.join("port.bin")).await?; + + Ok(()) + } + + pub(crate) async fn load_node(&self) -> Result { + let public_key = StdByteArray::load(&self.root.join("public_key.bin")).await?; + let private_key = StdByteArray::load(&self.root.join("private_key.bin")).await?; + let peers = Vec::::load(&self.node_root.join("peers.bin")).await?; + let ip_address: PillarIPAddr = PillarIPAddr::load(&self.node_root.join("ip_address.bin")).await?; + let port = u16::load(&self.node_root.join("port.bin")).await?; + + Ok(NodeState { + ip_address: ip_address.into(), + port, + peers, + public_key, + private_key + }) + + } + + pub async fn load_chain(&self) -> Result, std::io::Error> { + if std::fs::metadata(self.chain_root.join("chain.bin")).is_ok() { + Ok(Some(Chain::load(&self.chain_root).await?)) + } else { + Ok(None) + } + } + +} \ No newline at end of file diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index bd0d5c1..a13b991 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -8,14 +8,13 @@ use pillar_crypto::types::StdByteArray; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; use crate::{ - accounting::state::StateManager, - blockchain::chain::Chain, - primitives::block::{Block, BlockHeader}, + accounting::state::StateManager, blockchain::chain::Chain, nodes::{node::Node, peer::PillarIPAddr}, primitives::block::{Block, BlockHeader} }; +pub mod manager; pub(crate) trait Persistable where - Self: Sized + PillarSerialize, + Self: PillarSerialize, { async fn save(&self, path: &PathBuf) -> Result<(), std::io::Error> { let bytes = self.serialize_pillar()?; @@ -28,6 +27,13 @@ where } impl Persistable for Vec where V: PillarSerialize {} +impl Persistable for HashMap +where + K: PillarSerialize + std::hash::Hash + Eq, + V: PillarSerialize, +{} + +impl Persistable for String {} impl Persistable for Block {} // we are using the default implementations, which is just serialize and save to a path impl Persistable for StateManager {} @@ -78,6 +84,9 @@ impl PillarNativeEndian for ChainMeta { } impl PillarFixedSize for ChainMeta {} impl Persistable for ChainMeta {} +impl Persistable for StdByteArray {} +impl Persistable for PillarIPAddr {} +impl Persistable for u16 {} // chains serialization is to save one block per file impl Persistable for Chain { diff --git a/libs/pillar_core/src/protocol/communication.rs b/libs/pillar_core/src/protocol/communication.rs index d487996..70b2296 100644 --- a/libs/pillar_core/src/protocol/communication.rs +++ b/libs/pillar_core/src/protocol/communication.rs @@ -171,7 +171,6 @@ mod tests { use super::*; use pillar_serialize::PillarSerialize; use tokio::net::TcpStream; - use crate::nodes::node::StartupModes; use crate::nodes::peer::Peer; use crate::protocol::serialization::{package_standard_message}; use crate::{ @@ -185,7 +184,7 @@ mod tests { async fn test_peer_declaration() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8084; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], false); tokio::spawn(serve_peers(node.clone(), None)); @@ -215,7 +214,7 @@ mod tests { async fn test_message_broadcast() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8080; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], false); tokio::spawn(serve_peers(node.clone(), None)); tokio::spawn(broadcast_knowledge(node.clone(), None)); @@ -277,7 +276,7 @@ mod tests { async fn test_error_response() { let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8090; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], false); tokio::spawn(serve_peers(node,None)); @@ -319,7 +318,7 @@ mod tests { async fn test_timeout_broadcast(){ let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.9").unwrap()); let port = 8091; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], false); let (sender, stop_signal) = flume::bounded(1); // Create a stop signal receiver let handle = tokio::spawn(broadcast_knowledge(node.clone(), Some(stop_signal.clone()))); @@ -341,7 +340,7 @@ mod tests { async fn test_timeout_serve(){ let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.9").unwrap()); let port = 8091; - let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], StartupModes::Empty, None); + let node = Node::new([1; 32], [2; 32], ip_address, port, vec![], false); let (sender, stop_signal) = flume::bounded(1); // Create a stop signal receiver let handle = tokio::spawn(serve_peers(node.clone(), Some(stop_signal.clone()))); diff --git a/libs/pillar_core/src/protocol/peers.rs b/libs/pillar_core/src/protocol/peers.rs index 3dbd608..889a1da 100644 --- a/libs/pillar_core/src/protocol/peers.rs +++ b/libs/pillar_core/src/protocol/peers.rs @@ -61,7 +61,6 @@ mod tests { use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::*; - use crate::nodes::node::StartupModes; use crate::nodes::{node::Node, peer::Peer}; use crate::protocol::serialization::{package_standard_message, read_standard_message}; use std::net::{IpAddr, Ipv4Addr}; @@ -85,8 +84,7 @@ mod tests { IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()), 8080, vec![existing_peer], - StartupModes::Empty, - None, + false ); // Mock new peer to be discovered diff --git a/libs/pillar_core/src/protocol/serialization.rs b/libs/pillar_core/src/protocol/serialization.rs index 0100395..8cc4d44 100644 --- a/libs/pillar_core/src/protocol/serialization.rs +++ b/libs/pillar_core/src/protocol/serialization.rs @@ -5,13 +5,14 @@ use pillar_crypto::{merkle_trie::MerkleTrie, types::{StdByteArray, STANDARD_ARRA use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; use tokio::{io::AsyncReadExt, net::TcpStream}; -use crate::{accounting::{account::{Account, TransactionStub}, state::StateManager}, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::Peer, primitives::{block::{Block, BlockHeader}, messages::Message, transaction::{Transaction, TransactionFilter}}, reputation::history::{HeaderShard, NodeHistory}}; +use crate::{accounting::{account::{Account, TransactionStub}, state::StateManager}, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::{Peer, PillarIPAddr}, primitives::{block::{Block, BlockHeader}, messages::Message, transaction::{Transaction, TransactionFilter}}, reputation::history::{HeaderShard, NodeHistory}}; impl PillarFixedSize for BlockHeader {} impl PillarFixedSize for Transaction {} impl PillarFixedSize for Peer {} impl PillarFixedSize for TransactionStub {} impl PillarFixedSize for HeaderShard {} +impl PillarFixedSize for PillarIPAddr {} /// Pack a serialized `Message` with a 4-byte little-endian length prefix. /// @@ -373,6 +374,10 @@ impl PillarNativeEndian for HeaderShard { } } +impl PillarNativeEndian for PillarIPAddr { + fn to_le(&mut self) {} +} + impl PillarSerialize for NodeHistory { /// Serialize a node's history as `[public_key:32][blocks_mined_len:u32][blocks_mined_bytes][blocks_stamped...]`. /// diff --git a/pillar/src/run.rs b/pillar/src/run.rs index 831131b..f2215ba 100644 --- a/pillar/src/run.rs +++ b/pillar/src/run.rs @@ -1,7 +1,7 @@ use std::sync::{Arc}; use axum::{extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse, routing::{get, post}, Json, Router}; -use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::{Node, StartupModes}, peer::Peer}, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; +use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::Node, peer::Peer}, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; use pillar_crypto::types::StdByteArray; use serde::{Deserialize, Serialize}; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; @@ -508,8 +508,7 @@ pub async fn launch_node(config: Config, genesis: bool, miner: bool) { config.ip_address, pillar_core::PROTOCOL_PORT, vec![], - if genesis { StartupModes::Genesis } else { StartupModes::Empty }, - if miner { Some(MinerPool::new()) } else { None } + genesis ); for peer in &config.wkps { From 80df98ec9094e344e39986b72dcb7ff6786920fd Mon Sep 17 00:00:00 2001 From: aheschl1 Date: Sat, 18 Oct 2025 14:33:47 -0400 Subject: [PATCH 11/11] tested working --- libs/pillar_core/src/nodes/mod.rs | 131 +++++++++++++++++++- libs/pillar_core/src/nodes/node.rs | 7 +- libs/pillar_core/src/persistence/manager.rs | 22 ++-- 3 files changed, 146 insertions(+), 14 deletions(-) diff --git a/libs/pillar_core/src/nodes/mod.rs b/libs/pillar_core/src/nodes/mod.rs index 508e5d7..1e7f179 100644 --- a/libs/pillar_core/src/nodes/mod.rs +++ b/libs/pillar_core/src/nodes/mod.rs @@ -9,6 +9,7 @@ mod tests { use chrono::Local; use pillar_crypto::{hashing::{DefaultHash, Hashable}, signing::{SigFunction, Signable}, types::StdByteArray}; + use tempfile::TempDir; use tracing::level_filters::LevelFilter; use tracing_subscriber::{ Layer, Registry, @@ -20,15 +21,13 @@ mod tests { use crate::{ accounting::{account, wallet::Wallet}, nodes::{ miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState}, peer::Peer - }, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} + }, persistence::manager::PersistenceManager, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} }; use super::node::Node; use std::{ - fs::File, - net::{IpAddr, Ipv4Addr}, - sync::Arc, time::Duration, + fs::File, net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc, time::Duration }; // always setup tracing first @@ -1994,4 +1993,128 @@ mod tests { } + async fn save_node(node: Node) -> TempDir{ + let path = tempfile::tempdir().unwrap(); + let persistence_manager = PersistenceManager::new(Some(path.path().to_path_buf())); + persistence_manager.save_node(&node).await.unwrap(); + path + } + + #[tokio::test] + async fn test_sync_two_blocks_persistence() { + let ip_address_a = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 8)); + let port_a = 6999; + + let ip_address_b = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9)); + let port_b = 6998; + + let ip_address_c = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 5)); + let port_c = 6997; + + + let (mut node_b, wallet_b) = create_empty_node_genisis( + ip_address_b, + port_b, + vec![], + true, + ) + .await; + + let (mut node_a, mut wallet_a) = create_empty_node_genisis( + ip_address_a, + port_a, + vec![Peer::new(wallet_b.address, ip_address_b, port_b)], + true, + ) + .await; + + let (mut node_c, mut wallet_c) = create_empty_node_genisis( + ip_address_c, + port_c, + vec![Peer::new(wallet_a.address, ip_address_a, port_a)], + false, + ) + .await; + + inner_test_new_node_online( + &mut node_a, + &mut node_b, + &mut node_c, + &mut wallet_a, + &wallet_b, + &wallet_c, + ) + .await; + // now, let A go offline. C will submit a transaction. + println!("Stopping node A - expect ConnetionRefused errors"); + node_a.stop().await; + let path_a = save_node(node_a).await; + println!("{path_a:?}"); + // pause a sec - TODO remove this after fixing the join on stoping + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // now, C will submit a transaction to B + let _ = submit_transaction( + &mut node_c, + &mut wallet_c, + wallet_b.address, + 0, + false, + None, + ) + .await + .unwrap(); + // pause a sec + tokio::time::sleep(std::time::Duration::from_secs(10)).await; // time to settle, and give up waiting for full block + // now both shold have 3 blocks + let chain_b = node_b.inner.chain.lock().await; + assert!(chain_b.as_ref().unwrap().depth == 2); // 3 blocks + assert!(chain_b.as_ref().unwrap().blocks.len() == 3); // 3 blocks + drop(chain_b); + // now c + let chain_c = node_c.inner.chain.lock().await; + assert!(chain_c.as_ref().unwrap().depth == 2); // 3 blocks + assert!(chain_c.as_ref().unwrap().blocks.len() == 3); // 3 blocks + drop(chain_c); + // DO A SECOND TRANSACTION TO GET 2 BLOCKS OUTDATED + let _ = submit_transaction( + &mut node_c, + &mut wallet_c, + wallet_b.address, + 0, + false, + None, + ) + .await + .unwrap(); + // pause a sec + tokio::time::sleep(std::time::Duration::from_secs(15)).await; // time to settle, and give up waiting for full block + // now both shold have 4 blocks + let chain_b = node_b.inner.chain.lock().await; + assert!(chain_b.as_ref().unwrap().depth == 3); // 4 blocks + assert!(chain_b.as_ref().unwrap().blocks.len() == 4); // 4 blocks + drop(chain_b); + // now c + let chain_c = node_c.inner.chain.lock().await; + assert!(chain_c.as_ref().unwrap().depth == 3); // 4 blocks + assert!(chain_c.as_ref().unwrap().blocks.len() == 4); // 4 blocks + drop(chain_c); + // put A back online + let mut node_a = Node::new_load(&PersistenceManager::new(Some(path_a.path().to_path_buf()))).await.expect("Failed to load node A"); + node_a.serve().await; + assert!(node_a.inner.state.read().await.clone() == NodeState::ChainSyncing); + // wait for a bit + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + // check if node a knows about node b and c + let peers_a = node_a.inner.peers.read().await; + assert!(peers_a.contains_key(&wallet_b.address)); + assert!(peers_a.contains_key(&wallet_c.address)); + drop(peers_a); + // now check that it collected the block + let chain_a = node_a.inner.chain.lock().await; + assert_eq!(chain_a.as_ref().unwrap().depth, 3); // 3 blocks + assert_eq!(chain_a.as_ref().unwrap().blocks.len(), 4); // 3 blocks + // now, node A should be in serving state + assert!(node_a.inner.state.read().await.clone() == NodeState::Serving); + } + } diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index 60debb0..10a531f 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -141,6 +141,11 @@ impl Node { ) -> Result{ let chain = persistence_manager.load_chain().await?; let node_state = persistence_manager.load_node().await?; + let state = if chain.is_some() { + NodeState::ChainOutdated + } else { + NodeState::ICD + }; let node = Node::new_inner( node_state.public_key, @@ -149,7 +154,7 @@ impl Node { node_state.port, node_state.peers, chain, - NodeState::ChainOutdated + state ); Ok(node) diff --git a/libs/pillar_core/src/persistence/manager.rs b/libs/pillar_core/src/persistence/manager.rs index 7d6ed8b..f5990ce 100644 --- a/libs/pillar_core/src/persistence/manager.rs +++ b/libs/pillar_core/src/persistence/manager.rs @@ -12,7 +12,7 @@ pub struct PersistenceManager { node_root: PathBuf, } -pub(crate) struct NodeState { +pub(crate) struct NodeShard { pub ip_address: IpAddr, pub port: u16, pub peers: Vec, @@ -59,23 +59,27 @@ impl PersistenceManager { let address: PillarIPAddr = node.ip_address.into(); - node.inner.public_key.save(&self.root.join("public_key.bin")).await?; - node.inner.private_key.save(&self.root.join("private_key.bin")).await?; - node.inner.peers.read().await.save(&self.node_root.join("peers.bin")).await?; + node.inner.public_key.save(&self.node_root.join("public_key.bin")).await?; + node.inner.private_key.save(&self.node_root.join("private_key.bin")).await?; + node.inner.peers.read() + .await.values() + .cloned() + .collect::>() + .save(&self.node_root.join("peers.bin")).await?; address.save(&self.node_root.join("ip_address.bin")).await?; node.port.save(&self.node_root.join("port.bin")).await?; Ok(()) } - pub(crate) async fn load_node(&self) -> Result { - let public_key = StdByteArray::load(&self.root.join("public_key.bin")).await?; - let private_key = StdByteArray::load(&self.root.join("private_key.bin")).await?; + pub(crate) async fn load_node(&self) -> Result { + let public_key = StdByteArray::load(&self.node_root.join("public_key.bin")).await?; + let private_key = StdByteArray::load(&self.node_root.join("private_key.bin")).await?; let peers = Vec::::load(&self.node_root.join("peers.bin")).await?; let ip_address: PillarIPAddr = PillarIPAddr::load(&self.node_root.join("ip_address.bin")).await?; let port = u16::load(&self.node_root.join("port.bin")).await?; - Ok(NodeState { + Ok(NodeShard { ip_address: ip_address.into(), port, peers, @@ -86,7 +90,7 @@ impl PersistenceManager { } pub async fn load_chain(&self) -> Result, std::io::Error> { - if std::fs::metadata(self.chain_root.join("chain.bin")).is_ok() { + if std::fs::metadata(self.chain_root.join("meta.bin")).is_ok() { Ok(Some(Chain::load(&self.chain_root).await?)) } else { Ok(None)