diff --git a/Cargo.lock b/Cargo.lock index 0927acc..718901d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,6 +483,27 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.60.2", +] + [[package]] name = "dtor" version = "0.0.6" @@ -523,6 +544,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fiat-crypto" version = "0.2.9" @@ -886,6 +923,22 @@ version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +[[package]] +name = "libredox" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +dependencies = [ + "bitflags 2.9.1", + "libc", +] + +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "lock_api" version = "0.4.13" @@ -1025,6 +1078,12 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1110,8 +1169,10 @@ dependencies = [ "bytes", "chrono", "ctor", + "dirs", "flume", "futures-util", + "hex", "lfqueue", "lz4_flex", "pillar_crypto", @@ -1119,6 +1180,7 @@ dependencies = [ "rand", "sled", "slotmap", + "tempfile", "tokio", "tracing", "tracing-appender", @@ -1267,6 +1329,17 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.16", + "libredox", + "thiserror 2.0.17", +] + [[package]] name = "regex-automata" version = "0.4.9" @@ -1299,6 +1372,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +dependencies = [ + "bitflags 2.9.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + [[package]] name = "rustversion" version = "1.0.21" @@ -1555,6 +1641,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "tempfile" +version = "3.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" +dependencies = [ + "fastrand", + "getrandom 0.3.3", + "once_cell", + "rustix", + "windows-sys 0.60.2", +] + [[package]] name = "thiserror" version = "1.0.69" diff --git a/libs/pillar_core/Cargo.toml b/libs/pillar_core/Cargo.toml index 210839a..8fdbae5 100644 --- a/libs/pillar_core/Cargo.toml +++ b/libs/pillar_core/Cargo.toml @@ -23,3 +23,6 @@ lz4_flex = "0.11.5" bytemuck = {version="1.23.2", features=["derive"]} bytes = "1.10.1" byteorder = "1.5.0" +hex = "0.4.3" +tempfile = "3.23.0" +dirs = "6.0.0" diff --git a/libs/pillar_core/src/accounting/state.rs b/libs/pillar_core/src/accounting/state.rs index febf806..bf8e060 100644 --- a/libs/pillar_core/src/accounting/state.rs +++ b/libs/pillar_core/src/accounting/state.rs @@ -1,5 +1,5 @@ //! State management built on a Merkle trie with optional reputation tracking. -use std::{collections::HashMap, fmt::Debug, sync::{Arc, Mutex}}; +use std::{collections::HashMap, fmt::Debug}; use pillar_crypto::{merkle_trie::MerkleTrie, types::StdByteArray}; @@ -12,9 +12,9 @@ pub type ReputationMap = HashMap; #[derive(Clone, Default)] pub struct StateManager{ // The mapping from address to account - pub state_trie: Arc>>, + pub state_trie: MerkleTrie, /// mapping of reputations for peers - pub reputations: Arc>, + pub reputations: ReputationMap, } impl Debug for StateManager { @@ -36,32 +36,29 @@ impl StateManager{ /// Create a new state manager with an empty trie and no reputations. pub fn new() -> Self { StateManager { - state_trie: Arc::new(Mutex::new(MerkleTrie::new())), - reputations: Arc::new(Mutex::new(HashMap::new())), + state_trie: MerkleTrie::new(), + reputations: HashMap::new(), } } /// Get an account by address at a specific `state_root` (if present). pub fn get_account(&self, address: &StdByteArray, state_root: StdByteArray) -> Option { - let state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.get(address, state_root) + self.state_trie.get(address, state_root) } /// Get an account or a default zero-balance account for the given address. pub fn get_account_or_default(&self, address: &StdByteArray, state_root: StdByteArray) -> Account { - let state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.get(address, state_root).unwrap_or(Account::new(*address, 0)) + self.state_trie.get(address, state_root).unwrap_or(Account::new(*address, 0)) } /// Return all accounts reachable from `root`. pub fn get_all_accounts(&self, root: StdByteArray) -> Vec{ - self.state_trie.lock().unwrap().get_all(root) + self.state_trie.get_all(root) } /// Remove a branched root and decrement reference counts, pruning unique nodes. pub fn remove_branch(&mut self, root: StdByteArray){ - let mut state_trie = self.state_trie.lock().expect("Failed to lock state trie"); - state_trie.trim_branch(root).expect("Failed to remove branch from state trie"); + self.state_trie.trim_branch(root).expect("Failed to remove branch from state trie"); } /// Apply a complete block to produce a new state root branch. @@ -99,13 +96,12 @@ impl StateManager{ // Update the accounts from the block let mut state_updates: HashMap = HashMap::new(); let state_root = prev_header.completion.as_ref().expect("Previous block should be complete").state_root; - let mut state_trie = self.state_trie.lock().expect("Failed to lock state trie"); for transaction in &block.transactions { let mut sender = match state_updates.get(&transaction.header.sender){ Some(account) => account.clone(), None => { // if the sender does not exist, we create a new account with 0 balance - let account = state_trie + let account = self.state_trie .get(&transaction.header.sender, state_root) .unwrap_or(Account::new(transaction.header.sender, 0)); @@ -124,7 +120,7 @@ impl StateManager{ let mut receiver = match state_updates.get(&transaction.header.receiver){ Some(account) => account.clone(), None => { - state_trie.get(&transaction.header.receiver, state_root).unwrap_or(Account::new(transaction.header.receiver, 0)) + self.state_trie.get(&transaction.header.receiver, state_root).unwrap_or(Account::new(transaction.header.receiver, 0)) }, }; // update balances @@ -138,7 +134,7 @@ impl StateManager{ Some(account) => account.clone(), None => { // if the miner account does not exist, we create a new account with 0 balance - state_trie.get(miner_address, state_root).unwrap_or(Account::new(*miner_address, 0)) + self.state_trie.get(miner_address, state_root).unwrap_or(Account::new(*miner_address, 0)) } }; miner_account.balance += if !por_enabled {reward} else {div_up(reward, POR_MINER_SHARE_DIVISOR)}; @@ -161,7 +157,7 @@ impl StateManager{ let mut stamper_account = match state_updates.get(stamper) { Some(account) => account.clone(), None => { - state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) + self.state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) } }; stamper_account.balance += stamper_reward; @@ -183,7 +179,7 @@ impl StateManager{ let mut stamper = match state_updates.get(stamper){ Some(account) => account.clone(), None => { - state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) + self.state_trie.get(stamper, state_root).unwrap_or(Account::new(*stamper, 0)) } }; if stamper.history.is_none(){ @@ -194,6 +190,6 @@ impl StateManager{ state_updates.insert(stamper.address, stamper); } // branch the state trie with the updates - state_trie.branch(Some(state_root), state_updates).expect("Issue with branching state trie") + self.state_trie.branch(Some(state_root), state_updates).expect("Issue with branching state trie") } } \ No newline at end of file diff --git a/libs/pillar_core/src/accounting/wallet.rs b/libs/pillar_core/src/accounting/wallet.rs index dfc2bb4..f48544b 100644 --- a/libs/pillar_core/src/accounting/wallet.rs +++ b/libs/pillar_core/src/accounting/wallet.rs @@ -1,5 +1,4 @@ //! Wallet wrapper around an ed25519 keypair with convenience methods. -use bytemuck::{Pod, Zeroable}; use pillar_crypto::{signing::{DefaultSigner, DefaultVerifier, SigFunction, SigVerFunction, Signable}, types::{StdByteArray, STANDARD_ARRAY_LENGTH}}; use pillar_serialize::PillarSerialize; diff --git a/libs/pillar_core/src/blockchain/chain.rs b/libs/pillar_core/src/blockchain/chain.rs index ebba4f9..172919e 100644 --- a/libs/pillar_core/src/blockchain/chain.rs +++ b/libs/pillar_core/src/blockchain/chain.rs @@ -1,7 +1,5 @@ -use core::hash; -use std::{collections::{HashMap, HashSet}, result}; +use std::{collections::{HashMap, HashSet}}; -use lz4_flex::block; use pillar_crypto::{hashing::DefaultHash, signing::{DefaultVerifier, SigVerFunction}, types::StdByteArray}; use tracing::instrument; @@ -33,12 +31,9 @@ pub struct Chain { impl Chain { /// Creates a new blockchain with a genesis block. pub fn new_with_genesis() -> Self { - let state_manager = StateManager::new(); + let mut state_manager = StateManager::new(); let state_root = state_manager .state_trie - .lock() - .as_mut() - .unwrap() .create_genesis([0; 32], Account::default()) .expect("Failed to create genesis state root"); @@ -66,11 +61,12 @@ impl Chain { } /// Create a chain without a genesis block. + /// Does NOT set state manager #[instrument( skip_all, name = "Chain::new_from_blocks", )] - pub fn new_from_blocks(blocks: HashMap) -> Self{ + pub(crate) unsafe fn new_from_blocks(blocks: HashMap) -> Self{ let mut headers = HashMap::new(); let mut deepest_hash = [0; 32]; let mut depth = 0; @@ -116,7 +112,7 @@ impl Chain { // get all reputations according to previous block let reputations = get_current_reputations_for_stampers(self, &block.header).values().cloned().collect::>(); - let (expected_target, is_por) = get_difficulty_for_block(&block.header, &reputations); + let (expected_target, _is_por) = get_difficulty_for_block(&block.header, &reputations); if block.header.completion.is_none() || expected_target != block.header.completion.as_ref().unwrap().difficulty_target { tracing::info!("Block difficulty target is invalid - Failing"); @@ -290,6 +286,33 @@ impl Chain { Ok(()) } + /// retruns False if the existing deepest is better + /// returns true if it shold be replaced + #[inline] + pub(crate) fn block_greater(&self, new_block: &Block) -> bool { + if self.depth > new_block.header.depth { + false + } else if self.depth == new_block.header.depth { + // take the block which is stamped with the most reputation + let current_deepest = self.get_top_block().unwrap(); + let existing_reputation: f64 = get_current_reputations_for_stampers_from_state( + &self.state_manager, + self.headers.get(¤t_deepest.header.previous_hash).unwrap(), + ¤t_deepest.header + ).values().sum(); + let new_reputation: f64 = get_current_reputations_for_stampers_from_state( + &self.state_manager, + self.headers.get(&new_block.header.previous_hash).unwrap(), + &new_block.header + ).values().sum(); + new_reputation > existing_reputation || ( + new_reputation == existing_reputation && new_block.header.tail.n_stamps() > current_deepest.header.tail.n_stamps() + ) + } else { // self.depth < new_block.header.depth + true + } + } + /// Call this only after a block has been verified #[instrument(skip_all, fields(block = ?block.header.completion))] fn settle_new_block(&mut self, block: Block) -> Result<(), BlockValidationError>{ @@ -313,29 +336,10 @@ impl Chain { tracing::debug!("Block settled in chain, but need to update depth."); // update the depth - the depth of this block is checked in the verification // perhaps this is a fork deeper in the chain, so we do not always update - if block.header.depth > self.depth { + if self.block_greater(&block) { tracing::info!("Chain depth expanded to {}", block.header.depth); self.deepest_hash = block.header.completion.as_ref().unwrap().hash; self.depth = block.header.depth; - } else if block.header.depth == self.depth { - // take the block which is stamped with the most reputation - let current_deepest = self.get_top_block().unwrap(); - let existing_reputation: f64 = get_current_reputations_for_stampers_from_state( - &self.state_manager, - self.headers.get(¤t_deepest.header.previous_hash).unwrap(), - ¤t_deepest.header - ).values().sum(); - let new_reputation: f64 = get_current_reputations_for_stampers_from_state( - &self.state_manager, - self.headers.get(&block.header.previous_hash).unwrap(), - &block.header - ).values().sum(); - if new_reputation > existing_reputation || ( - new_reputation == existing_reputation && block.header.tail.n_stamps() > current_deepest.header.tail.n_stamps() - ){ - tracing::info!("Chain depth fork replaced deepest block with higher reputation fork"); - self.deepest_hash = block.header.completion.as_ref().unwrap().hash; - } } Ok(()) } @@ -373,7 +377,7 @@ impl Chain { let deepest = &self.deepest_hash; let mut stack: Vec = self.leaves.iter().filter(|x| { - let b = self.get_block(*x).unwrap(); + let b = self.get_block(x).unwrap(); x != &deepest && b.header.depth >= min_depth.unwrap_or(0) }).cloned().collect(); stack.push(*deepest); @@ -396,7 +400,7 @@ impl Chain { } stack.push(b.header.previous_hash); // this block depth is 1 greater than previous - if b.header.depth+1 <= max_depth.unwrap_or(u64::MAX) { + if b.header.depth < max_depth.unwrap_or(u64::MAX) { result.push(block); } @@ -936,4 +940,49 @@ mod tests { assert!(chain.blocks.contains_key(&main_hash)); assert_eq!(chain.leaves.len(), 1); // Only the main chain remains } + + #[tokio::test] + async fn test_new_from_blocks(){ + let mut chain = Chain::new_with_genesis(); + let mut signing_key = DefaultSigner::generate_random(); + // public + let sender = signing_key.get_verifying_function().to_bytes(); + + let mut parent_hash = chain.deepest_hash; + let genesis_hash = parent_hash; + + // Build a main chain of depth 5 + for depth in 1..=5 { + let time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() + depth; + let mut transaction = Transaction::new(sender, [2; 32], 0, time, depth-1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut block = Block::new( + parent_hash, + 0, + time, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain.headers.get(&block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = chain.state_manager.branch_from_block_internal(&block, prev_header, &sender); + mine(&mut block, sender, state_root, vec![], None, DefaultHash::new()).await; + parent_hash = block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(block).unwrap(); + } + + let blocks: HashMap = chain.blocks.clone(); + let new_chain = unsafe { Chain::new_from_blocks(blocks) }; + assert_eq!(new_chain.depth, chain.depth); + assert_eq!(new_chain.leaves, chain.leaves); + assert_eq!(new_chain.blocks, chain.blocks); + assert_eq!(new_chain.headers, chain.headers); + assert_eq!(new_chain.deepest_hash, chain.deepest_hash); + + } } diff --git a/libs/pillar_core/src/blockchain/chain_shard.rs b/libs/pillar_core/src/blockchain/chain_shard.rs index 7e27067..37cf7d7 100644 --- a/libs/pillar_core/src/blockchain/chain_shard.rs +++ b/libs/pillar_core/src/blockchain/chain_shard.rs @@ -20,11 +20,8 @@ impl ChainShard{ /// ensures the hashs are good, and the depths work pub fn validate(&self) -> Result<(), BlockValidationError>{ let mut genesis_found = false; - let state_manager = StateManager::new(); + let mut state_manager = StateManager::new(); let state_root = state_manager.state_trie - .lock() - .as_mut() - .unwrap() .create_genesis([0; 32], Account::default()).unwrap(); for (declared_hash, header) in &self.headers { diff --git a/libs/pillar_core/src/nodes/miner.rs b/libs/pillar_core/src/nodes/miner.rs index 7c59565..32fd699 100644 --- a/libs/pillar_core/src/nodes/miner.rs +++ b/libs/pillar_core/src/nodes/miner.rs @@ -156,7 +156,7 @@ mod test{ use pillar_crypto::hashing::DefaultHash; - use crate::{persistence::database::GenesisDatastore, primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; + use crate::{primitives::{block::{Block, BlockTail}, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::MIN_DIFFICULTY, pow::mine}}; use crate::nodes::miner::Miner; use super::Node; @@ -166,7 +166,7 @@ mod test{ let private_key = [2u8; 32]; let ip_address = IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()); let port = 8080; - let node = Node::new(public_key, private_key, ip_address, port, vec![], Some(Arc::new(GenesisDatastore::new())), Some(MinerPool::new())); + let node = Node::new(public_key, private_key, ip_address, port, vec![], true); let miner = Miner::new(node).unwrap(); let mut hasher = DefaultHash::new(); diff --git a/libs/pillar_core/src/nodes/mod.rs b/libs/pillar_core/src/nodes/mod.rs index f5dc8b8..1e7f179 100644 --- a/libs/pillar_core/src/nodes/mod.rs +++ b/libs/pillar_core/src/nodes/mod.rs @@ -9,6 +9,7 @@ mod tests { use chrono::Local; use pillar_crypto::{hashing::{DefaultHash, Hashable}, signing::{SigFunction, Signable}, types::StdByteArray}; + use tempfile::TempDir; use tracing::level_filters::LevelFilter; use tracing_subscriber::{ Layer, Registry, @@ -20,15 +21,13 @@ mod tests { use crate::{ accounting::{account, wallet::Wallet}, nodes::{ miner::{Miner, MAX_TRANSACTION_WAIT_TIME}, node::{self, NodeState}, peer::Peer - }, persistence::database::GenesisDatastore, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} + }, persistence::manager::PersistenceManager, primitives::{messages::Message, pool::MinerPool, transaction::Transaction}, protocol::{difficulty::get_reward_from_depth_and_stampers, peers::{discover_peer, discover_peers}, transactions::{get_transaction_proof, submit_transaction}} }; use super::node::Node; use std::{ - fs::File, - net::{IpAddr, Ipv4Addr}, - sync::Arc, time::Duration, + fs::File, net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc, time::Duration }; // always setup tracing first @@ -88,8 +87,7 @@ mod tests { ip_address: IpAddr, port: u16, peers: Vec, - genesis_store: bool, - miner_pool: Option, + genesis: bool ) -> (Node, Wallet){ let wallet = Wallet::generate_random(); @@ -100,8 +98,7 @@ mod tests { ip_address, port, peers.clone(), - if genesis_store {Some(Arc::new(GenesisDatastore::new()))} else {None}, - miner_pool.clone(), + genesis, ); assert_eq!(node.inner.public_key, wallet.address); @@ -109,11 +106,10 @@ mod tests { assert_eq!(node.ip_address, ip_address); assert_eq!(node.port, port); assert_eq!(node.inner.peers.read().await.len(), peers.len()); - assert_eq!(node.inner.chain.lock().await.is_some(), genesis_store); - assert_eq!(node.miner_pool.is_some(), miner_pool.is_some()); + assert_eq!(node.inner.chain.lock().await.is_some(), genesis); assert!(node.inner.transaction_filters.lock().await.is_empty()); assert!(node.inner.filter_callbacks.lock().await.is_empty()); - if genesis_store{ + if genesis{ assert_eq!(node.inner.state.read().await.clone(), NodeState::ChainOutdated); }else{ assert_eq!(node.inner.state.read().await.clone(), NodeState::ICD); @@ -129,8 +125,6 @@ mod tests { let ip_address = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); let port = 8070; let peers = vec![]; - let datastore = GenesisDatastore::new(); - let transaction_pool = None; let mut node = Node::new( public_key, @@ -138,8 +132,7 @@ mod tests { ip_address, port, peers, - Some(Arc::new(datastore)), - transaction_pool, + true ); node.serve().await; @@ -170,7 +163,6 @@ mod tests { port, peers, true, - None, ) .await; @@ -188,7 +180,6 @@ mod tests { 8070, vec![node1.clone().into()], true, - None, ) .await; @@ -243,7 +234,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -252,7 +242,6 @@ mod tests { port_d, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], true, - None, ) .await; @@ -261,7 +250,6 @@ mod tests { port_c, vec![Peer::new(wallet_d.address, ip_address_d, port_d)], true, - None, ) .await; @@ -271,7 +259,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - None, ) .await; @@ -364,7 +351,6 @@ mod tests { port_b, vec![], true, - None, ) .await; @@ -373,7 +359,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -705,7 +690,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -714,7 +698,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -741,7 +724,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), // we will mine from this node ).await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -749,7 +731,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ).await; inner_test_transaction_and_block_proposal( @@ -928,7 +909,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -937,7 +917,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -946,7 +925,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -978,7 +956,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -987,7 +964,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -996,7 +972,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -1073,7 +1048,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1082,7 +1056,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1091,7 +1064,6 @@ mod tests { port_c, vec![Peer::new(wallet_a.address, ip_address_a, port_a)], false, - None, ) .await; @@ -1187,7 +1159,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1196,7 +1167,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1280,7 +1250,6 @@ mod tests { port_a, vec![], true, - Some(MinerPool::new()), ).await; let (mut node_b, _ ) = create_empty_node_genisis( @@ -1288,7 +1257,6 @@ mod tests { port_b, vec![node_a.clone().into()], true, - Some(MinerPool::new()), ).await; let mut miner_a = Miner::new(node_a.clone()).unwrap(); @@ -1346,7 +1314,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; @@ -1355,7 +1322,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1620,7 +1586,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1628,7 +1593,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1665,7 +1629,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1673,7 +1636,6 @@ mod tests { port_a, vec![Peer::new(wallet_b.address, ip_address_b, port_b)], true, - None, ) .await; @@ -1731,7 +1693,6 @@ mod tests { port_c, vec![], true, - None, ).await; // create node b with miner pool let (mut node_b, wallet_b) = create_empty_node_genisis( @@ -1739,7 +1700,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - None, ) .await; let (mut node_a, mut wallet_a) = create_empty_node_genisis( @@ -1747,7 +1707,6 @@ mod tests { port_a, vec![Peer::new(wallet_c.address, ip_address_c, port_c), Peer::new(wallet_b.address, ip_address_b, port_b)], true, - Some(MinerPool::new()), ) .await; @@ -1823,7 +1782,6 @@ mod tests { port_c, vec![], true, - None, ).await; // create node b with miner pool let (node_b, mut wallet_b) = create_empty_node_genisis( @@ -1831,7 +1789,6 @@ mod tests { port_b, vec![Peer::new(wallet_c.address, ip_address_c, port_c)], true, - Some(MinerPool::new()), ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -1839,7 +1796,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -1942,7 +1898,6 @@ mod tests { port_b, vec![], true, - Some(MinerPool::new()), ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -1950,7 +1905,6 @@ mod tests { port_a, vec![], true, - None, ) .await; @@ -1983,7 +1937,6 @@ mod tests { port_c, vec![], false, - None, ) .await; @@ -1992,7 +1945,6 @@ mod tests { port_b, vec![], true, - None, ) .await; let (mut node_a, wallet_a) = create_empty_node_genisis( @@ -2000,7 +1952,6 @@ mod tests { port_a, vec![], false, - None, ) .await; @@ -2042,4 +1993,128 @@ mod tests { } + async fn save_node(node: Node) -> TempDir{ + let path = tempfile::tempdir().unwrap(); + let persistence_manager = PersistenceManager::new(Some(path.path().to_path_buf())); + persistence_manager.save_node(&node).await.unwrap(); + path + } + + #[tokio::test] + async fn test_sync_two_blocks_persistence() { + let ip_address_a = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 8)); + let port_a = 6999; + + let ip_address_b = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9)); + let port_b = 6998; + + let ip_address_c = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 5)); + let port_c = 6997; + + + let (mut node_b, wallet_b) = create_empty_node_genisis( + ip_address_b, + port_b, + vec![], + true, + ) + .await; + + let (mut node_a, mut wallet_a) = create_empty_node_genisis( + ip_address_a, + port_a, + vec![Peer::new(wallet_b.address, ip_address_b, port_b)], + true, + ) + .await; + + let (mut node_c, mut wallet_c) = create_empty_node_genisis( + ip_address_c, + port_c, + vec![Peer::new(wallet_a.address, ip_address_a, port_a)], + false, + ) + .await; + + inner_test_new_node_online( + &mut node_a, + &mut node_b, + &mut node_c, + &mut wallet_a, + &wallet_b, + &wallet_c, + ) + .await; + // now, let A go offline. C will submit a transaction. + println!("Stopping node A - expect ConnetionRefused errors"); + node_a.stop().await; + let path_a = save_node(node_a).await; + println!("{path_a:?}"); + // pause a sec - TODO remove this after fixing the join on stoping + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // now, C will submit a transaction to B + let _ = submit_transaction( + &mut node_c, + &mut wallet_c, + wallet_b.address, + 0, + false, + None, + ) + .await + .unwrap(); + // pause a sec + tokio::time::sleep(std::time::Duration::from_secs(10)).await; // time to settle, and give up waiting for full block + // now both shold have 3 blocks + let chain_b = node_b.inner.chain.lock().await; + assert!(chain_b.as_ref().unwrap().depth == 2); // 3 blocks + assert!(chain_b.as_ref().unwrap().blocks.len() == 3); // 3 blocks + drop(chain_b); + // now c + let chain_c = node_c.inner.chain.lock().await; + assert!(chain_c.as_ref().unwrap().depth == 2); // 3 blocks + assert!(chain_c.as_ref().unwrap().blocks.len() == 3); // 3 blocks + drop(chain_c); + // DO A SECOND TRANSACTION TO GET 2 BLOCKS OUTDATED + let _ = submit_transaction( + &mut node_c, + &mut wallet_c, + wallet_b.address, + 0, + false, + None, + ) + .await + .unwrap(); + // pause a sec + tokio::time::sleep(std::time::Duration::from_secs(15)).await; // time to settle, and give up waiting for full block + // now both shold have 4 blocks + let chain_b = node_b.inner.chain.lock().await; + assert!(chain_b.as_ref().unwrap().depth == 3); // 4 blocks + assert!(chain_b.as_ref().unwrap().blocks.len() == 4); // 4 blocks + drop(chain_b); + // now c + let chain_c = node_c.inner.chain.lock().await; + assert!(chain_c.as_ref().unwrap().depth == 3); // 4 blocks + assert!(chain_c.as_ref().unwrap().blocks.len() == 4); // 4 blocks + drop(chain_c); + // put A back online + let mut node_a = Node::new_load(&PersistenceManager::new(Some(path_a.path().to_path_buf()))).await.expect("Failed to load node A"); + node_a.serve().await; + assert!(node_a.inner.state.read().await.clone() == NodeState::ChainSyncing); + // wait for a bit + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + // check if node a knows about node b and c + let peers_a = node_a.inner.peers.read().await; + assert!(peers_a.contains_key(&wallet_b.address)); + assert!(peers_a.contains_key(&wallet_c.address)); + drop(peers_a); + // now check that it collected the block + let chain_a = node_a.inner.chain.lock().await; + assert_eq!(chain_a.as_ref().unwrap().depth, 3); // 3 blocks + assert_eq!(chain_a.as_ref().unwrap().blocks.len(), 4); // 3 blocks + // now, node A should be in serving state + assert!(node_a.inner.state.read().await.clone() == NodeState::Serving); + } + } diff --git a/libs/pillar_core/src/nodes/node.rs b/libs/pillar_core/src/nodes/node.rs index cb183ac..10a531f 100644 --- a/libs/pillar_core/src/nodes/node.rs +++ b/libs/pillar_core/src/nodes/node.rs @@ -2,16 +2,13 @@ use super::peer::Peer; use flume::{Receiver, Sender}; use pillar_crypto::{hashing::{DefaultHash, Hashable}, signing::{DefaultSigner, SigFunction, Signable}, types::StdByteArray}; use tracing::instrument; -use std::{collections::{HashMap, HashSet}, net::IpAddr, sync::Arc}; +use std::{collections::{HashMap, HashSet}, net::IpAddr, path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, RwLock}; use crate::{ - blockchain::chain::Chain, - persistence::database::{Datastore, EmptyDatastore}, - primitives::{block::{Block, BlockHeader, Stamp}, messages::Message, pool::MinerPool, transaction::{FilterMatch, TransactionFilter}}, - protocol::{chain::{block_settle_consumer, discover_chain, service_sync, sync_chain}, + accounting::state::StateManager, blockchain::chain::Chain, persistence::{self, manager::PersistenceManager}, primitives::{block::{Block, BlockHeader, Stamp}, messages::Message, pool::MinerPool, transaction::{FilterMatch, TransactionFilter}}, protocol::{chain::{block_settle_consumer, discover_chain, service_sync, sync_chain}, communication::{broadcast_knowledge, serve_peers}, - reputation::{nth_percentile_peer, N_TRANSMISSION_SIGNATURES}}, + reputation::{nth_percentile_peer, N_TRANSMISSION_SIGNATURES}} }; /// Operational state of a node, which controls how it treats incoming data. @@ -61,9 +58,9 @@ impl NodeState { } } -impl Into for NodeState { - fn into(self) -> String { - match self { +impl From for String { + fn from(val: NodeState) -> Self { + match val { NodeState::ChainLoading => "ChainLoading", NodeState::ChainOutdated => "ChainOutdated", NodeState::ChainSyncing => "ChainSyncing", @@ -75,26 +72,6 @@ impl Into for NodeState { } } -fn get_initial_state(datastore: &dyn Datastore) -> (NodeState, Option) { - // check if the datastore has a chain - if datastore.latest_chain().is_some() { - // if it does, load the chain - match datastore.load_chain() { - Ok(chain) => { - // assign the chain to the node - (NodeState::ChainOutdated, Some(chain)) - }, - Err(_) => { - // if it fails, we are in discovery mode - (NodeState::ICD, None) - } - } - } else { - // if it does not, we are in discovery mode - (NodeState::ICD, None) - } -} - /// Shared inner state of a node (behind `Arc`). pub struct NodeInner { pub public_key: StdByteArray, @@ -110,8 +87,6 @@ pub struct NodeInner { pub broadcasted_already: RwLock>, // transaction filter queue pub transaction_filters: Mutex>, - /// the datastore - pub datastore: Option>, /// A queue of blocks which are to be settled to the chain pub late_settle_queue: lfqueue::UnboundedQueue, /// the state represents the nodes ability to communicate with other nodes @@ -136,8 +111,6 @@ pub struct Node { kill_settle: Option>, } - - impl Node { /// Create a new node #[instrument( @@ -149,19 +122,53 @@ impl Node { port = port ) )] - pub fn new( + pub fn new<'a>( public_key: StdByteArray, private_key: StdByteArray, ip_address: IpAddr, port: u16, peers: Vec, - mut database: Option>, - transaction_pool: Option, + genesis: bool + ) -> Self { + let state = if genesis { NodeState::ChainOutdated } else { NodeState::ICD }; + let chain = if genesis { Some(Chain::new_with_genesis()) } else { None }; + tracing::debug!("Node initial state: {:?}", state); + Node::new_inner(public_key, private_key, ip_address, port, peers, chain, state) + } + + pub async fn new_load( + persistence_manager: &PersistenceManager, + ) -> Result{ + let chain = persistence_manager.load_chain().await?; + let node_state = persistence_manager.load_node().await?; + let state = if chain.is_some() { + NodeState::ChainOutdated + } else { + NodeState::ICD + }; + + let node = Node::new_inner( + node_state.public_key, + node_state.private_key, + node_state.ip_address, + node_state.port, + node_state.peers, + chain, + state + ); + + Ok(node) + } + + fn new_inner( + public_key: StdByteArray, + private_key: StdByteArray, + ip_address: IpAddr, + port: u16, + peers: Vec, + chain: Option, + state: NodeState, ) -> Self { - if database.is_none() { - tracing::warn!("Node created without a database. This will not persist the chain or transactions."); - database = Some(Arc::new(EmptyDatastore::new())); - } let broadcast_queue = lfqueue::UnboundedQueue::new(); let late_settle_queue = lfqueue::UnboundedQueue::new(); let transaction_filters = Mutex::new(Vec::new()); @@ -170,26 +177,23 @@ impl Node { .iter() .map(|peer| (peer.public_key, *peer)) .collect::>(); - tracing::info!("Node created with {} initial peers", peer_map.len()); - let (state, maybe_chain) = get_initial_state(&**database.as_ref().unwrap()); - tracing::debug!("Node initial state: {:?}", state); + Node { inner: NodeInner { public_key, private_key, peers: RwLock::new(peer_map), - chain: Mutex::new(maybe_chain), + chain: Mutex::new(chain), broadcasted_already, transaction_filters, broadcast_queue, filter_callbacks: Mutex::new(HashMap::new()), // initially no callbacks state: RwLock::new(state), // initially in discovery mode late_settle_queue, - datastore: database, }.into(), ip_address, port, - miner_pool: transaction_pool, + miner_pool: Some(MinerPool::new()), kill_broadcast: None, kill_serve: None, kill_settle: None, @@ -243,7 +247,7 @@ impl Node { Some(handle) }, _ => { - panic!("Unexpected node state for initialization: {:?}", state); + panic!("Unexpected node state for initialization: {state:?}"); } }; if let Some(handle) = handle { @@ -579,7 +583,7 @@ impl From for Peer { } } -pub trait Broadcaster { +pub(crate) trait Broadcaster { /// Broadcast a message to all peers. async fn broadcast(&self, message: &Message) -> Result, std::io::Error>; } diff --git a/libs/pillar_core/src/persistence/database.rs b/libs/pillar_core/src/persistence/database.rs deleted file mode 100644 index 6cb1ed1..0000000 --- a/libs/pillar_core/src/persistence/database.rs +++ /dev/null @@ -1,178 +0,0 @@ -use std::collections::HashMap; - - -use pillar_crypto::types::StdByteArray; - -use crate::{blockchain::chain::Chain, primitives::block::Block}; - -pub trait Datastore: Send + Sync { - /// If a chain exists on disk. - /// - /// Returns `Some(chain_timestamp)` if the chain exists, or `None` if it does not. - fn latest_chain(&self) -> Option; - - /// Loads chain from disk. - /// - /// Returns a `Chain` if it exists, or an error if it does not. - fn load_chain(&self) -> Result; - - /// Saves a chain to disk. - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error>; - - /// Saves a block to disk. - /// - /// Returns `Ok(())` if the block was saved successfully, or an error if it was not. - fn save_block(&self, block: Block) -> Result<(), std::io::Error>; - - /// Loads a block from disk. - fn load_block(&self, block_hash: &str) -> Result; - - /// sync the on disk state with a new chain - /// This will write/remove as needed to ensure the on disk state matches the chain. - fn sync_chain(&self, chain: Chain) -> Result<(), std::io::Error>; - -} - -/// The most basic datastore that is essentially memory based without any persistence. -#[derive(Clone)] -pub struct GenesisDatastore{ - chain: Chain -} - -impl GenesisDatastore { - pub fn new() -> Self { - GenesisDatastore { - chain: Chain::new_with_genesis(), - } - } -} - -impl Datastore for GenesisDatastore { - fn latest_chain(&self) -> Option { - self.chain.leaves.iter().last().map(|leaf| self.chain.blocks.get(leaf).unwrap().header.timestamp) - } - - fn load_chain(&self) -> Result { - Ok(self.chain.clone()) - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - self.chain = chain; - Ok(()) - } - - fn save_block(&self, _block: Block) -> Result<(), std::io::Error> { - Ok(()) - } - - fn load_block(&self, _block_hash: &str) -> Result { - unimplemented!() - } - - fn sync_chain(&self, _chain: Chain) -> Result<(), std::io::Error> { - unimplemented!() - } -} - -/// This datastore never provides any chain, but it can store. -pub struct EmptyDatastore{ - chain: Option -} - -impl EmptyDatastore { - pub fn new() -> Self { - EmptyDatastore { - chain: None, - } - } -} - -impl Datastore for EmptyDatastore { - fn latest_chain(&self) -> Option { - match &self.chain{ - Some(chain) => chain.leaves.iter().last().map(|leaf| chain.blocks.get(leaf).unwrap().header.timestamp), - None => None, - } - } - - fn load_chain(&self) -> Result { - match &self.chain { - Some(chain) => Ok(chain.clone()), - None => Err(std::io::Error::new(std::io::ErrorKind::NotFound, "No chain found")), - } - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - self.chain = Some(chain); - Ok(()) - } - - fn save_block(&self, _block: Block) -> Result<(), std::io::Error> { - Ok(()) - } - - fn load_block(&self, _block_hash: &str) -> Result { - unimplemented!() - } - - fn sync_chain(&self, _chain: Chain) -> Result<(), std::io::Error> { - unimplemented!() - } -} - - - - - -pub struct SledDatastore { - data: sled::Db, -} - -impl SledDatastore { - pub fn new(address: String) -> Self { - SledDatastore { - data: sled::open(address).expect("Failed to open sled database"), - } - } -} - -impl Datastore for SledDatastore { - fn latest_chain(&self) -> Option { - let timestamp = self.data.get("latest_timestamp"); - match timestamp { - Ok(Some(value)) => { - let timestamp = u64::from_le_bytes( - value.as_ref().try_into().expect("Failed to convert timestamp to u64") - ); - timestamp.into() - }, - _ => None, - } - } - - fn load_chain(&self) -> Result { - let leaf_hashes = self.data.get("leaf_hashes") - .map_err(std::io::Error::other)?; - // the leaf hash's will point off to the respective blocks. then, we will work our way backwards, loading each block. - // from there, we reconstruct the chain - let mut blocks: HashMap; - todo!(); - - } - - fn save_chain(&mut self, chain: Chain) -> Result<(), std::io::Error> { - todo!() - } - - fn save_block(&self, block: Block) -> Result<(), std::io::Error> { - todo!() - } - - fn load_block(&self, block_hash: &str) -> Result { - todo!() - } - - fn sync_chain(&self, chain: Chain) -> Result<(), std::io::Error> { - todo!() - } -} \ No newline at end of file diff --git a/libs/pillar_core/src/persistence/manager.rs b/libs/pillar_core/src/persistence/manager.rs new file mode 100644 index 0000000..f5990ce --- /dev/null +++ b/libs/pillar_core/src/persistence/manager.rs @@ -0,0 +1,100 @@ +use std::{net::IpAddr, path::PathBuf}; + +use pillar_crypto::types::StdByteArray; + +use crate::{blockchain::chain::Chain, nodes::{node::{self, Node}, peer::{Peer, PillarIPAddr}}, persistence::Persistable, PROTOCOL_PORT}; + + + +pub struct PersistenceManager { + pub root: PathBuf, + chain_root: PathBuf, + node_root: PathBuf, +} + +pub(crate) struct NodeShard { + pub ip_address: IpAddr, + pub port: u16, + pub peers: Vec, + pub public_key: StdByteArray, + pub private_key: StdByteArray +} + +impl PersistenceManager { + /// Creates a new state manager working off + /// the root. + /// + /// If None, defaults to ~/.pillar + pub fn new(root: Option) -> Self { + let root = match root { + Some(a) => a, + None => { + tracing::info!("State setting up at ~/.pillar"); + let home = dirs::home_dir().expect("Cannot startup state without home dir if a path is not specified."); + let r = home.join(".pillar"); + if ! std::fs::exists(&r).unwrap() { + tracing::info!("Creating folder {}", r.display()); + std::fs::create_dir_all(&r).unwrap(); + } + r + } + }; + if !std::fs::exists(root.join("chain")).unwrap() { + std::fs::create_dir_all(root.join("chain")).unwrap(); + } + if !std::fs::exists(root.join("node")).unwrap() { + std::fs::create_dir_all(root.join("node")).unwrap(); + } + Self { + root: root.clone(), + chain_root: root.clone().join("chain"), + node_root: root.join("node"), + } + } + + pub(crate) async fn save_node(&self, node: &Node) -> Result<(), std::io::Error>{ + if let Some(chain) = node.inner.chain.lock().await.as_ref() { + chain.save(&self.chain_root).await?; + } + + let address: PillarIPAddr = node.ip_address.into(); + + node.inner.public_key.save(&self.node_root.join("public_key.bin")).await?; + node.inner.private_key.save(&self.node_root.join("private_key.bin")).await?; + node.inner.peers.read() + .await.values() + .cloned() + .collect::>() + .save(&self.node_root.join("peers.bin")).await?; + address.save(&self.node_root.join("ip_address.bin")).await?; + node.port.save(&self.node_root.join("port.bin")).await?; + + Ok(()) + } + + pub(crate) async fn load_node(&self) -> Result { + let public_key = StdByteArray::load(&self.node_root.join("public_key.bin")).await?; + let private_key = StdByteArray::load(&self.node_root.join("private_key.bin")).await?; + let peers = Vec::::load(&self.node_root.join("peers.bin")).await?; + let ip_address: PillarIPAddr = PillarIPAddr::load(&self.node_root.join("ip_address.bin")).await?; + let port = u16::load(&self.node_root.join("port.bin")).await?; + + Ok(NodeShard { + ip_address: ip_address.into(), + port, + peers, + public_key, + private_key + }) + + } + + pub async fn load_chain(&self) -> Result, std::io::Error> { + if std::fs::metadata(self.chain_root.join("meta.bin")).is_ok() { + Ok(Some(Chain::load(&self.chain_root).await?)) + } else { + Ok(None) + } + } + +} \ No newline at end of file diff --git a/libs/pillar_core/src/persistence/mod.rs b/libs/pillar_core/src/persistence/mod.rs index 3ea3f13..a13b991 100644 --- a/libs/pillar_core/src/persistence/mod.rs +++ b/libs/pillar_core/src/persistence/mod.rs @@ -1 +1,303 @@ -pub mod database; \ No newline at end of file +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, +}; + +use bytemuck::{Pod, Zeroable}; +use pillar_crypto::types::StdByteArray; +use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; + +use crate::{ + accounting::state::StateManager, blockchain::chain::Chain, nodes::{node::Node, peer::PillarIPAddr}, primitives::block::{Block, BlockHeader} +}; +pub mod manager; + +pub(crate) trait Persistable +where + Self: PillarSerialize, +{ + async fn save(&self, path: &PathBuf) -> Result<(), std::io::Error> { + let bytes = self.serialize_pillar()?; + tokio::fs::write(path, bytes).await + } + async fn load(path: &PathBuf) -> Result { + let bytes = tokio::fs::read(path).await?; + Self::deserialize_pillar(&bytes) + } +} + +impl Persistable for Vec where V: PillarSerialize {} +impl Persistable for HashMap +where + K: PillarSerialize + std::hash::Hash + Eq, + V: PillarSerialize, +{} + +impl Persistable for String {} + +impl Persistable for Block {} // we are using the default implementations, which is just serialize and save to a path +impl Persistable for StateManager {} + +#[inline] +fn load_blocks_from_dir(path: &PathBuf) -> Result, std::io::Error> { + let block_files = std::fs::read_dir(path)?; + let mut blocks: HashMap = HashMap::new(); + for entry in block_files { + let entry = entry?; + if entry.file_type()?.is_file() && entry.file_name().len() == 68 { + let hash: StdByteArray = hex::decode(entry.file_name().to_string_lossy().split(".").next().unwrap()) + .unwrap() + .try_into() + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid hash in filename") + })?; + let block = Block::deserialize_pillar(&std::fs::read(entry.path())?)?; + blocks.insert(hash, block); + } + } + Ok(blocks) +} + +#[inline] +fn get_headers_from_blocks( + blocks: &HashMap, +) -> HashMap { + let mut headers = HashMap::new(); + for (hash, block) in blocks { + headers.insert(*hash, block.header); + } + headers +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C)] +struct ChainMeta { + deepest_hash: StdByteArray, + depth: u64, + _padding: [u8; 24], // padding to make the struct size a multiple of 32 bytes +} + +impl PillarNativeEndian for ChainMeta { + fn to_le(&mut self) { + self.depth = self.depth.to_le(); + } +} +impl PillarFixedSize for ChainMeta {} +impl Persistable for ChainMeta {} +impl Persistable for StdByteArray {} +impl Persistable for PillarIPAddr {} +impl Persistable for u16 {} + +// chains serialization is to save one block per file +impl Persistable for Chain { + async fn save(&self, path: &std::path::PathBuf) -> Result<(), std::io::Error> { + if !std::fs::metadata(path)?.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Path must be a directory", + )); + } + for (block_hash, block) in &self.blocks { + // println!("{}", hex::encode(block_hash)); + let block_path = path.join(format!("{}.bin", hex::encode(block_hash))); + block.save(&block_path).await?; + } + let metadata = ChainMeta { + deepest_hash: self.deepest_hash, + depth: self.depth, + _padding: [0u8; 24], + }; + let meta_path = path.join("meta.bin"); + let state_manager_path = path.join("state.bin"); + let leaves_path = path.join("leaves.bin"); + metadata.save(&meta_path).await?; + // TODO this clone sus af (implment serialization for reference types) + self.leaves + .iter() + .cloned() + .collect::>() + .save(&leaves_path) + .await?; + self.state_manager.save(&state_manager_path).await?; + Ok(()) + } + + async fn load(path: &std::path::PathBuf) -> Result { + if !std::fs::metadata(path)?.is_dir() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Path must be a directory", + )); + } + let blocks = load_blocks_from_dir(path)?; + let headers = get_headers_from_blocks(&blocks); + let metadata = ChainMeta::load(&path.join("meta.bin")).await?; + let state_manager = StateManager::load(&path.join("state.bin")).await?; + let leaves = Vec::::load(&path.join("leaves.bin")) + .await? + .iter() + .cloned() + .collect::>(); + + let chain = Chain { + deepest_hash: metadata.deepest_hash, + depth: metadata.depth, + blocks, + headers, + leaves, + state_manager, + }; + Ok(chain) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::path::PathBuf; + + use pillar_crypto::hashing::DefaultHash; + use pillar_crypto::signing::{DefaultSigner, SigFunction, SigVerFunction, Signable}; + + use crate::blockchain::chain::Chain; + use crate::persistence::Persistable; + use crate::primitives::block::{Block, BlockTail}; + use crate::primitives::transaction::Transaction; + use crate::protocol::pow::mine; + + #[tokio::test] + async fn test_persist_block() { + let t = Transaction::new([1; 32], [2; 32], 0, 0, 0, &mut DefaultHash::new()); + let block = Block::new( + [0; 32], + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + vec![t], + None, + BlockTail::default().stamps, + 0, + None, + None, + &mut DefaultHash::new(), + ); + let file = tempfile::NamedTempFile::new().unwrap(); + let path = PathBuf::from(file.path()); + block.save(&path).await.unwrap(); + + let loaded_block = Block::load(&path).await.unwrap(); + assert_eq!(block, loaded_block); + } + + #[tokio::test] + async fn test_chain_persistence() { + let mut chain = Chain::new_with_genesis(); + let mut signing_key = DefaultSigner::generate_random(); + let sender = signing_key.get_verifying_function().to_bytes(); + + let mut main_hash = chain.deepest_hash; + let genesis_hash = main_hash; + + // Extend the main chain to depth 10 + for depth in 1..=15 { + let mut transaction = + Transaction::new(sender, [2; 32], 0, 0, depth - 1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut block = Block::new( + main_hash, + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + depth, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain + .headers + .get(&block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = + chain + .state_manager + .branch_from_block_internal(&block, prev_header, &sender); + mine( + &mut block, + sender, + state_root, + vec![], + None, + DefaultHash::new(), + ) + .await; + main_hash = block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(block).unwrap(); + } + + // Create a fork that shares some nodes with the main chain + let mut fork_hash = genesis_hash; + for depth in 1..=3 { + let mut transaction = + Transaction::new(sender, [2; 32], 0, 0, depth - 1, &mut DefaultHash::new()); + transaction.sign(&mut signing_key); + let mut fork_block = Block::new( + fork_hash, + 0, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + + depth + + 15, + vec![transaction], + None, + BlockTail::default().stamps, + depth, + None, + None, + &mut DefaultHash::new(), + ); + let prev_header = chain + .headers + .get(&fork_block.header.previous_hash) + .expect("Previous block header not found"); + let state_root = + chain + .state_manager + .branch_from_block_internal(&fork_block, prev_header, &sender); + mine( + &mut fork_block, + sender, + state_root, + vec![], + None, + DefaultHash::new(), + ) + .await; + fork_hash = fork_block.header.completion.as_ref().unwrap().hash; + chain.add_new_block(fork_block).unwrap(); + } + + // write the chain, then get a new one + + let temp_dir = tempfile::tempdir().unwrap(); + let path = PathBuf::from(temp_dir.path()); + chain.save(&path).await.unwrap(); + + let new_chain = Chain::load(&path).await.unwrap(); + + assert_eq!(new_chain.depth, chain.depth); + assert_eq!(new_chain.leaves, chain.leaves); + assert_eq!(new_chain.blocks.len(), chain.blocks.len()); + assert_eq!(new_chain.headers.len(), chain.headers.len()); + assert_eq!(new_chain.deepest_hash, chain.deepest_hash); + } +} diff --git a/libs/pillar_core/src/primitives/implementations/block.rs b/libs/pillar_core/src/primitives/implementations/block.rs index 00faf26..0bdf8ca 100644 --- a/libs/pillar_core/src/primitives/implementations/block.rs +++ b/libs/pillar_core/src/primitives/implementations/block.rs @@ -74,4 +74,5 @@ impl Block { false } } -} \ No newline at end of file +} + diff --git a/libs/pillar_core/src/primitives/implementations/tail.rs b/libs/pillar_core/src/primitives/implementations/tail.rs index 008f914..8eaf387 100644 --- a/libs/pillar_core/src/primitives/implementations/tail.rs +++ b/libs/pillar_core/src/primitives/implementations/tail.rs @@ -1,4 +1,4 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use pillar_crypto::{hashing::Hashable, signing::{DefaultVerifier, SigVerFunction, Signable}, types::StdByteArray}; diff --git a/libs/pillar_core/src/primitives/pool.rs b/libs/pillar_core/src/primitives/pool.rs index aeab9f1..342cb90 100644 --- a/libs/pillar_core/src/primitives/pool.rs +++ b/libs/pillar_core/src/primitives/pool.rs @@ -21,6 +21,12 @@ pub struct MinerPool { /// Transaction pool for now is just a vector of transactions /// In the future, it will be a more complex structure - perhaps a max heap on the transaction fee /// Rn, FIFO +impl Default for MinerPool { + fn default() -> Self { + Self::new() + } +} + impl MinerPool{ pub fn new() -> Self { let (mine_abort_sender, mine_abort_receiver) = flume::unbounded(); diff --git a/libs/pillar_core/src/protocol/chain.rs b/libs/pillar_core/src/protocol/chain.rs index c2a0fb6..c909f55 100644 --- a/libs/pillar_core/src/protocol/chain.rs +++ b/libs/pillar_core/src/protocol/chain.rs @@ -268,7 +268,7 @@ pub async fn service_sync(node: Node, leaves: &Vec) -> Result { Ok(peer) } - _ => Err(std::io::Error::other(format!("Invalid message received: {:?}", response))), + _ => Err(std::io::Error::other(format!("Invalid message received: {response:?}"))), } } @@ -84,8 +84,7 @@ mod tests { IpAddr::V4(Ipv4Addr::from_str("127.0.0.1").unwrap()), 8080, vec![existing_peer], - None, - None, + false ); // Mock new peer to be discovered diff --git a/libs/pillar_core/src/protocol/serialization.rs b/libs/pillar_core/src/protocol/serialization.rs index 4c3dfc7..8cc4d44 100644 --- a/libs/pillar_core/src/protocol/serialization.rs +++ b/libs/pillar_core/src/protocol/serialization.rs @@ -1,17 +1,18 @@ use std::collections::HashMap; -use pillar_crypto::types::{StdByteArray, STANDARD_ARRAY_LENGTH}; +use pillar_crypto::{merkle_trie::MerkleTrie, types::{StdByteArray, STANDARD_ARRAY_LENGTH}}; use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; use tokio::{io::AsyncReadExt, net::TcpStream}; -use crate::{accounting::{account::{Account, TransactionStub}, state::StateManager}, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::Peer, primitives::{block::{Block, BlockHeader}, messages::Message, transaction::{Transaction, TransactionFilter}}, reputation::history::{HeaderShard, NodeHistory}}; +use crate::{accounting::{account::{Account, TransactionStub}, state::StateManager}, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::{Peer, PillarIPAddr}, primitives::{block::{Block, BlockHeader}, messages::Message, transaction::{Transaction, TransactionFilter}}, reputation::history::{HeaderShard, NodeHistory}}; impl PillarFixedSize for BlockHeader {} impl PillarFixedSize for Transaction {} impl PillarFixedSize for Peer {} impl PillarFixedSize for TransactionStub {} impl PillarFixedSize for HeaderShard {} +impl PillarFixedSize for PillarIPAddr {} /// Pack a serialized `Message` with a 4-byte little-endian length prefix. /// @@ -165,6 +166,24 @@ impl PillarSerialize for crate::primitives::messages::Message { } +impl PillarSerialize for StateManager { + fn serialize_pillar(&self) -> Result, std::io::Error> { + let mut bytes = vec![]; + let trie_bytes = self.state_trie.serialize_pillar()?; + bytes.extend((trie_bytes.len() as u32).to_le_bytes()); + bytes.extend(trie_bytes); + bytes.extend(self.reputations.serialize_pillar()?); + Ok(bytes) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + let trielen = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize; + let trie = MerkleTrie::::deserialize_pillar(&data[4..trielen + 4])?; + let reputations = HashMap::::deserialize_pillar(&data[trielen + 4..])?; + Ok(StateManager { state_trie: trie, reputations }) + } +} + impl PillarSerialize for Block{ /// Serialize a `Block` as `[header][transactions...]`. /// @@ -214,6 +233,7 @@ impl PillarSerialize for Chain { buffer.extend(self.deepest_hash.serialize_pillar()?); // TODO maybe big clone buffer.extend(self.leaves.iter().cloned().collect::>().serialize_pillar()?); + // Ok(buffer) } @@ -245,8 +265,8 @@ impl PillarSerialize for Chain { let leaves = Vec::::deserialize_pillar(&data[offset..])?; Ok(Chain { - blocks, - headers, + blocks, + headers, depth, deepest_hash, leaves: leaves.into_iter().collect(), @@ -354,6 +374,10 @@ impl PillarNativeEndian for HeaderShard { } } +impl PillarNativeEndian for PillarIPAddr { + fn to_le(&mut self) {} +} + impl PillarSerialize for NodeHistory { /// Serialize a node's history as `[public_key:32][blocks_mined_len:u32][blocks_mined_bytes][blocks_stamped...]`. /// @@ -408,17 +432,13 @@ mod tests { use pillar_serialize::PillarSerialize; use crate::{ - primitives::{ + accounting::account::TransactionStub, blockchain::{chain::Chain, chain_shard::ChainShard}, nodes::peer::Peer, primitives::{ block::{Block, BlockHeader, Stamp}, messages::Message, transaction::{Transaction, TransactionFilter}, - }, - protocol::{reputation::N_TRANSMISSION_SIGNATURES, versions::Versions}, - blockchain::{chain::Chain, chain_shard::ChainShard}, - nodes::peer::Peer, - accounting::account::TransactionStub, + }, protocol::{reputation::N_TRANSMISSION_SIGNATURES, versions::Versions} }; - use pillar_crypto::{types::StdByteArray, proofs::{MerkleProof, HashDirection}}; + use pillar_crypto::proofs::{MerkleProof, HashDirection}; use std::net::{IpAddr, Ipv4Addr}; #[test] diff --git a/libs/pillar_crypto/src/merkle_trie.rs b/libs/pillar_crypto/src/merkle_trie.rs index 22fe8a1..49bf1b9 100644 --- a/libs/pillar_crypto/src/merkle_trie.rs +++ b/libs/pillar_crypto/src/merkle_trie.rs @@ -4,13 +4,14 @@ //! into nibbles (0-15). Values are serialized using `PillarSerialize`. The //! structure supports multiple roots for branching state (e.g., competing //! chain tips) while sharing unchanged subtrees. -use std::{collections::{HashMap, HashSet, VecDeque}, fmt::Debug, marker::PhantomData}; +use std::{collections::{HashMap, HashSet, VecDeque}, fmt::Debug, hash::Hash, marker::PhantomData}; -use pillar_serialize::PillarSerialize; -use slotmap::{new_key_type, SlotMap}; +use pillar_serialize::{PillarFixedSize, PillarNativeEndian, PillarSerialize}; +use slotmap::{new_key_type, KeyData, SlotMap}; use crate::{hashing::{DefaultHash, HashFunction, Hashable}, types::StdByteArray}; + new_key_type! { pub struct NodeKey; } /// In order to store account states, a Merkle Patricia Trie will be used @@ -55,6 +56,7 @@ impl TrieNode{ } +#[derive(Clone)] pub struct MerkleTrie { _phantum: PhantomData, pub(crate) nodes: SlotMap>, // SlotMap to store Trie nodes @@ -336,7 +338,89 @@ impl MerkleTrie { } +impl PillarSerialize for NodeKey { + fn serialize_pillar(&self) -> Result, std::io::Error> { + Ok(self.0.as_ffi().serialize_pillar()?) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + Ok(NodeKey(KeyData::from_ffi( + u64::from_le_bytes(data[0..8].try_into().unwrap()) + ))) + } +} +impl PillarSerialize for MerkleTrie { + fn serialize_pillar(&self) -> Result, std::io::Error> { + let mut buffer = Vec::new(); + buffer.extend((self.nodes.len() as u32).to_le_bytes()); + for (key, node) in &self.nodes { + buffer.extend(key.serialize_pillar()?); // 8 bytes + buffer.extend(node.references.serialize_pillar()?); // 2 bytes + let vbuff = node.value.serialize_pillar()?; + buffer.extend((vbuff.len() as u32).to_le_bytes()); + buffer.extend(vbuff); + } + for (_, node) in &self.nodes { + // do children now + let cbuff = node.children.serialize_pillar()?; + buffer.extend((cbuff.len() as u32).to_le_bytes()); + buffer.extend(cbuff); + } + buffer.extend(self.roots.serialize_pillar()?); + Ok(buffer) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + let mut key_map: HashMap = HashMap::new(); + let n = u32::from_le_bytes(data[0..4].try_into().unwrap()); + let mut offset = 4; + let mut nodes = SlotMap::with_key(); + let mut order = vec![]; + for _ in 0..n { + let key = NodeKey::deserialize_pillar(&data[offset..offset + 8])?; + offset+=8; + let references = u16::from_le_bytes(data[offset..offset+2].try_into().unwrap()); + offset += 2; + let vlength = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + let value = Option::>::deserialize_pillar(&data[offset..offset + vlength])?; + offset += vlength; + let mut node: TrieNode = TrieNode::new(); + node.value = value; + node.references = references; + let new_key = nodes.insert(node); + key_map.insert(key, new_key); + order.push(new_key); + } + + for i in 0..n { + let length = u32::from_le_bytes(data[offset..offset+4].try_into().unwrap()) as usize; + offset += 4; + let children: [Option; 16] = <[Option; 16]>::deserialize_pillar(&data[offset..offset + length])? + .map(|old_key: Option|{ + match old_key { + None => None, + Some(key) => key_map.get(&key).cloned() + } + }); + offset += length; + let node = nodes.get_mut(*order.get(i as usize).unwrap()).unwrap(); + node.children = children; + } + + let mut roots: HashMap = HashMap::::deserialize_pillar(&data[offset..])?; + for (k, v) in roots.clone() { + roots.insert(k.clone(), key_map.get(&v).cloned().unwrap()); + } + + Ok(MerkleTrie { + roots: roots, + _phantum: PhantomData, + nodes: nodes + }) + } +} #[cfg(test)] mod tests { @@ -536,18 +620,18 @@ mod tests { fn test_proof_for_branch() { let initial_account_info = AccountState { balance: 100, nonce: 1 }; // let (mut trie, initial_root) = MerkleTrie::<&str, AccountState>::new("account0", initial_account_info.clone()); - let mut trie = MerkleTrie::<&str, AccountState>::new(); - let initial_root = trie.create_genesis("account0", initial_account_info.clone()).expect("Failed to create genesis"); + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); let account1 = AccountState { balance: 200, nonce: 2 }; - trie.insert("account1", account1.clone(), initial_root).unwrap(); + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); // let branch_keys = vec![("account1", AccountState { balance: 300, nonce: 3 })]; let mut branch_keys = HashMap::new(); - branch_keys.insert("account1", AccountState { balance: 300, nonce: 3 }); + branch_keys.insert("account1".into(), AccountState { balance: 300, nonce: 3 }); let new_root = trie.branch(Some(initial_root), branch_keys).unwrap(); - let (proof, _) = generate_proof_of_state(&trie, "account1", Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); + let (proof, _) = generate_proof_of_state(&trie, "account1".into(), Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); let root_key = trie.roots.get(&new_root).expect("Root not found"); @@ -564,6 +648,29 @@ mod tests { &mut DefaultHash::new(), ); assert!(valid, "Proof verification failed for branch"); + + // test serialize + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + let (proof, _) = generate_proof_of_state(&trie2, "account1".into(), Some(new_root), &mut DefaultHash::new()).expect("Proof generation failed for branch"); + + let root_key = trie2.roots.get(&new_root).expect("Root not found"); + + let valid = proof.verify( + AccountState { balance: 300, nonce: 3 }.serialize_pillar().unwrap(), + trie2.get_hash_for(*root_key, &mut DefaultHash::new()).unwrap(), + &mut DefaultHash::new(), + ); + assert!(valid, "Proof verification failed for branch"); + + let valid = proof.verify( + AccountState { balance: 300, nonce: 3 }.serialize_pillar().unwrap(), + trie2.get_hash_for(*root_key, &mut DefaultHash::new()).unwrap(), + &mut DefaultHash::new(), + ); + assert!(valid, "Proof verification failed for branch"); + } #[test] @@ -661,23 +768,23 @@ mod tests { #[test] fn test_get_all() { let initial_account_info = AccountState { balance: 100, nonce: 1 }; - let mut trie = MerkleTrie::<&str, AccountState>::new(); - let initial_root = trie.create_genesis("account0", initial_account_info.clone()).expect("Failed to create genesis"); + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); let account1 = AccountState { balance: 200, nonce: 2 }; - trie.insert("account1", account1.clone(), initial_root).unwrap(); + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); let account2 = AccountState { balance: 300, nonce: 3 }; - trie.insert("account2", account2.clone(), initial_root).unwrap(); + trie.insert("account2".into(), account2.clone(), initial_root).unwrap(); let account3 = AccountState { balance: 400, nonce: 4 }; - trie.insert("account3", account3.clone(), initial_root).unwrap(); + trie.insert("account3".into(), account3.clone(), initial_root).unwrap(); // branch let account4 = AccountState { balance: 4000, nonce: 0}; let mut updates = HashMap::new(); - updates.insert("account4", account4.clone()); + updates.insert("account4".into(), account4.clone()); let _ = trie.branch(Some(initial_root), updates).unwrap(); let all_values = trie.get_all(initial_root); @@ -688,6 +795,20 @@ mod tests { assert!(all_values.contains(&account2)); assert!(all_values.contains(&account3)); assert!(!all_values.contains(&account4)); + + // quick serialize + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + let all_values = trie2.get_all(initial_root); + + assert_eq!(all_values.len(), 4); // account0, account1, account2, account3 + assert!(all_values.contains(&initial_account_info)); + assert!(all_values.contains(&account1)); + assert!(all_values.contains(&account2)); + assert!(all_values.contains(&account3)); + assert!(!all_values.contains(&account4)); + } #[test] @@ -791,4 +912,68 @@ mod tests { } + + #[test] + fn test_serialize_trie() { + let initial_account_info = AccountState { balance: 100, nonce: 1 }; + let mut trie = MerkleTrie::::new(); + let initial_root = trie.create_genesis("account0".into(), initial_account_info.clone()).expect("Failed to create genesis"); + + let account1 = AccountState { balance: 200, nonce: 2 }; + trie.insert("account1".into(), account1.clone(), initial_root).unwrap(); + + let account2 = AccountState { balance: 300, nonce: 3 }; + trie.insert("account2".into(), account2.clone(), initial_root).unwrap(); + + // branch + let mut updates = HashMap::new(); + updates.insert("account3".into(), AccountState { balance: 2000, nonce: 44 }); + let new_root = trie.branch(Some(initial_root), updates).unwrap(); + + // branch again + let mut updates2 = HashMap::new(); + updates2.insert("account4".into(), AccountState { balance: 0, nonce: 1 }); + let new_root2 = trie.branch(Some(new_root), updates2).unwrap(); + + // check that the new root is still there + assert!(trie.get(&"account3".into(), new_root).is_some()); + assert!(trie.get(&"account4".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie.get(&"account1".into(), initial_root), Some(account1.clone())); + assert_eq!(trie.get(&"account2".into(), initial_root), Some(account2.clone())); + assert_eq!(trie.get(&"account0".into(), initial_root), Some(initial_account_info.clone())); + + // getall, make sure account 3 does not exist + let all_values = trie.get_all(initial_root); + assert_eq!(all_values.len(), 3); // account0, account1, account2 + + // trim the branch + trie.trim_branch(new_root).expect("Failed to trim branch"); + + // check that the new root is still there + assert!(trie.get(&"account3".into(), new_root).is_none()); + assert!(trie.get(&"account3".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie.get(&"account1".into(), initial_root), Some(account1.clone())); + assert_eq!(trie.get(&"account2".into(), initial_root), Some(account2.clone())); + assert_eq!(trie.get(&"account0".into(), initial_root), Some(initial_account_info.clone())); + + // ========================= test serialize ========================= + let bytes = trie.serialize_pillar().unwrap(); + let trie2 = MerkleTrie::::deserialize_pillar(&bytes).unwrap(); + + // same assettions + + // check that the new root is still there + assert!(trie2.get(&"account3".into(), new_root).is_none()); + assert!(trie2.get(&"account3".into(), new_root2).is_some()); + + // check that the old root is still there + assert_eq!(trie2.get(&"account1".into(), initial_root), Some(account1)); + assert_eq!(trie2.get(&"account2".into(), initial_root), Some(account2)); + assert_eq!(trie2.get(&"account0".into(), initial_root), Some(initial_account_info)); + + } } \ No newline at end of file diff --git a/libs/pillar_serialize/src/lib.rs b/libs/pillar_serialize/src/lib.rs index def4360..d1a3eca 100644 --- a/libs/pillar_serialize/src/lib.rs +++ b/libs/pillar_serialize/src/lib.rs @@ -89,7 +89,7 @@ impl PillarFixedSize for i8 {} impl PillarFixedSize for i16 {} impl PillarFixedSize for i32 {} impl PillarFixedSize for i64 {} -impl PillarFixedSize for [u8; 32] {} +impl PillarFixedSize for [u8; C] {} impl PillarNativeEndian for StdByteArray { fn to_le(&mut self) {} @@ -252,7 +252,7 @@ where impl PillarSerialize for HashMap where - V: PillarSerialize + V: PillarSerialize + PillarFixedSize { /// Specialized map encoding for 32-byte keys; value length is still prefixed. default fn serialize_pillar(&self) -> Result, std::io::Error> { @@ -288,7 +288,7 @@ where /// a common hashmap implmentation for address/hash to fixed size value impl PillarSerialize for HashMap where - V: PillarSerialize + PillarNativeEndian + Zeroable + Pod + V: PillarSerialize + PillarNativeEndian + Zeroable + Pod + PillarFixedSize { /// Tight map encoding for 32-byte keys and fixed-size values (no per-entry length). fn serialize_pillar(&self) -> Result, std::io::Error> { @@ -335,4 +335,29 @@ impl PillarSerialize for String { .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid UTF-8"))?; Ok(string) } +} + +impl PillarSerialize for [Option; C]{ + fn serialize_pillar(&self) -> Result, std::io::Error> { + let mut buffer = Vec::new(); + for item in self { + let internal = item.serialize_pillar()?; + buffer.extend((internal.len() as u32).to_le_bytes()); + buffer.extend(internal); + } + Ok(buffer) + } + + fn deserialize_pillar(data: &[u8]) -> Result { + let mut offset = 0; + let mut array: [Option; C] = [None; C]; + for i in 0..C { + let length = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + let item = Option::::deserialize_pillar(&data[offset..offset + length])?; + array[i] = item; + offset += length; + } + Ok(array) + } } \ No newline at end of file diff --git a/pillar/src/run.rs b/pillar/src/run.rs index 827077e..f2215ba 100644 --- a/pillar/src/run.rs +++ b/pillar/src/run.rs @@ -1,7 +1,7 @@ use std::sync::{Arc}; use axum::{extract::{Path, Query, State}, http::HeaderMap, response::IntoResponse, routing::{get, post}, Json, Router}; -use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::Node, peer::Peer}, persistence::database::GenesisDatastore, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; +use pillar_core::{accounting::{account, wallet::{self, Wallet}}, nodes::{miner::Miner, node::Node, peer::Peer}, primitives::pool::MinerPool, protocol::peers::discover_peer, reputation::history::NodeHistory}; use pillar_crypto::types::StdByteArray; use serde::{Deserialize, Serialize}; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; @@ -508,8 +508,7 @@ pub async fn launch_node(config: Config, genesis: bool, miner: bool) { config.ip_address, pillar_core::PROTOCOL_PORT, vec![], - if genesis { Some(Arc::new(GenesisDatastore::new())) } else { None }, - if miner { Some(MinerPool::new()) } else { None } + genesis ); for peer in &config.wkps {