From 7af81ad9a0b25f1c456114fa79e44d8b1dfb0381 Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Thu, 22 Jan 2026 21:30:11 -0700 Subject: [PATCH 1/8] Move FutureIntersectRelations to a module --- .../src/processes/pruning_proof/build.rs | 41 +---------------- consensus/src/processes/reachability/mod.rs | 1 + .../src/processes/reachability/relations.rs | 46 +++++++++++++++++++ 3 files changed, 48 insertions(+), 40 deletions(-) create mode 100644 consensus/src/processes/reachability/relations.rs diff --git a/consensus/src/processes/pruning_proof/build.rs b/consensus/src/processes/pruning_proof/build.rs index 530c6120bd..008f5c065f 100644 --- a/consensus/src/processes/pruning_proof/build.rs +++ b/consensus/src/processes/pruning_proof/build.rs @@ -27,6 +27,7 @@ use crate::{ processes::{ ghostdag::{ordering::SortableBlock, protocol::GhostdagManager}, pruning_proof::{GhostdagReaderExt, ProofInternalError}, + reachability::relations::FutureIntersectRelations, relations::RelationsStoreExtensions, }, }; @@ -41,46 +42,6 @@ struct LevelProofContext { count: u64, } -/// A relations-store reader restricted to the future of a fixed root block (including the root). -/// -/// Only parents and children that lie within the root’s future are exposed. -/// This provides a consistent, root-relative view of relations when operating on -/// proofs or subgraphs confined to that region of the DAG. -#[derive(Clone)] -struct FutureIntersectRelations { - relations_store: T, - reachability_service: U, - root: Hash, -} - -impl FutureIntersectRelations { - fn new(relations_store: T, reachability_service: U, root: Hash) -> Self { - Self { relations_store, reachability_service, root } - } -} - -impl RelationsStoreReader for FutureIntersectRelations { - fn get_parents(&self, hash: Hash) -> Result { - self.relations_store.get_parents(hash).map(|hashes| { - // Reachability queries are safe here, since in this context all blocks are reached via `reachable_parents_at_level` - hashes.iter().copied().filter(|&h| self.reachability_service.is_dag_ancestor_of(self.root, h)).collect_vec().into() - }) - } - - fn get_children(&self, hash: Hash) -> StoreResult> { - assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash), "future(root) invariant violated"); - self.relations_store.get_children(hash) - } - - fn has(&self, hash: Hash) -> Result { - Ok(self.relations_store.has(hash)? && self.reachability_service.is_dag_ancestor_of(self.root, hash)) - } - - fn counts(&self) -> Result<(usize, usize), StoreError> { - unreachable!("not expected to be called in this context") - } -} - /// Utility for creating retry-indexed temporary GHOSTDAG stores. /// /// Each call to `new_store` returns a fresh temporary `DbGhostdagStore` for the diff --git a/consensus/src/processes/reachability/mod.rs b/consensus/src/processes/reachability/mod.rs index ef4d1e252c..a45d45b21f 100644 --- a/consensus/src/processes/reachability/mod.rs +++ b/consensus/src/processes/reachability/mod.rs @@ -2,6 +2,7 @@ mod extensions; pub mod inquirer; pub mod interval; mod reindex; +pub mod relations; pub mod tests; mod tree; diff --git a/consensus/src/processes/reachability/relations.rs b/consensus/src/processes/reachability/relations.rs new file mode 100644 index 0000000000..fe2b8040c8 --- /dev/null +++ b/consensus/src/processes/reachability/relations.rs @@ -0,0 +1,46 @@ +use itertools::Itertools; +use kaspa_consensus_core::{BlockHashSet, blockhash::BlockHashes}; +use kaspa_database::prelude::{ReadLock, StoreError, StoreResult}; +use kaspa_hashes::Hash; + +use crate::model::{services::reachability::ReachabilityService, stores::relations::RelationsStoreReader}; + +/// A relations-store reader restricted to the future of a fixed root block (including the root). +/// +/// Only parents and children that lie within the root’s future are exposed. +/// This provides a consistent, root-relative view of relations when operating on +/// proofs or subgraphs confined to that region of the DAG. +#[derive(Clone)] +pub struct FutureIntersectRelations { + relations_store: T, + reachability_service: U, + root: Hash, +} + +impl FutureIntersectRelations { + pub fn new(relations_store: T, reachability_service: U, root: Hash) -> Self { + Self { relations_store, reachability_service, root } + } +} + +impl RelationsStoreReader for FutureIntersectRelations { + fn get_parents(&self, hash: Hash) -> Result { + self.relations_store.get_parents(hash).map(|hashes| { + // Reachability queries are safe here, since in this context all blocks are reached via `reachable_parents_at_level` + hashes.iter().copied().filter(|&h| self.reachability_service.is_dag_ancestor_of(self.root, h)).collect_vec().into() + }) + } + + fn get_children(&self, hash: Hash) -> StoreResult> { + assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash), "future(root) invariant violated"); + self.relations_store.get_children(hash) + } + + fn has(&self, hash: Hash) -> Result { + Ok(self.relations_store.has(hash)? && self.reachability_service.is_dag_ancestor_of(self.root, hash)) + } + + fn counts(&self) -> Result<(usize, usize), StoreError> { + unreachable!("not expected to be called in this context") + } +} From 922183dc18a3c47411613f8375a4fc8386ac2a3a Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Tue, 26 Aug 2025 23:32:00 -0600 Subject: [PATCH 2/8] Split ghostdag parent selection from inc coloring The behavior of the ghostdag function remains the same, but now we can call incremental_coloring and pass it a committed selected_parent if we need to. --- consensus/src/processes/ghostdag/protocol.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consensus/src/processes/ghostdag/protocol.rs b/consensus/src/processes/ghostdag/protocol.rs index e3cf905f06..be0cbb1939 100644 --- a/consensus/src/processes/ghostdag/protocol.rs +++ b/consensus/src/processes/ghostdag/protocol.rs @@ -133,6 +133,11 @@ impl GhostdagData { let k = self.k; // Initialize new GHOSTDAG block data with the selected parent let mut new_block_data = GhostdagData::new_with_selected_parent(selected_parent, k); From 0c0b77bbd79b463608b28c7926db4b817dec75ed Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Sun, 25 Jan 2026 21:59:16 -0700 Subject: [PATCH 3/8] Implement MemoryHeaderStore --- consensus/src/model/stores/headers.rs | 94 ++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/consensus/src/model/stores/headers.rs b/consensus/src/model/stores/headers.rs index afd5f3734b..e1ec127d2d 100644 --- a/consensus/src/model/stores/headers.rs +++ b/consensus/src/model/stores/headers.rs @@ -1,7 +1,9 @@ +use std::cell::RefCell; use std::sync::Arc; +use kaspa_consensus_core::{BlockHashMap, HashMapCustomHasher}; use kaspa_consensus_core::{BlockHasher, BlockLevel, header::Header}; -use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess}; +use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DbKey}; use kaspa_database::prelude::{CachePolicy, DB}; use kaspa_database::prelude::{StoreError, StoreResult}; use kaspa_database::registry::DatabaseStorePrefixes; @@ -222,3 +224,93 @@ impl HeaderStore for DbHeadersStore { Ok(()) } } + +#[derive(Clone)] +pub struct MemoryHeaderStore { + header_map: RefCell>>, +} + +impl Default for MemoryHeaderStore { + fn default() -> Self { + Self::new() + } +} + +impl MemoryHeaderStore { + pub fn new() -> Self { + Self { header_map: RefCell::new(BlockHashMap::new()) } + } +} + +impl HeaderStoreReader for MemoryHeaderStore { + fn get_bits(&self, hash: Hash) -> std::result::Result { + // Prefer stored header if available + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(h.bits); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } + + fn get_blue_score(&self, hash: Hash) -> std::result::Result { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(h.blue_score); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } + + fn get_compact_header_data( + &self, + hash: Hash, + ) -> std::result::Result { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + let compact: crate::model::stores::headers::CompactHeaderData = + crate::model::stores::headers::CompactHeaderData::from(h.as_ref()); + return Ok(compact); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::HeadersCompact.as_ref(), hash))) + } + + fn get_daa_score(&self, hash: Hash) -> std::result::Result { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(h.daa_score); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } + + fn get_header(&self, hash: Hash) -> std::result::Result, StoreError> { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(Arc::clone(h)); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } + + fn get_header_with_block_level( + &self, + hash: Hash, + ) -> std::result::Result { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(crate::model::stores::headers::HeaderWithBlockLevel { header: Arc::clone(h), block_level: 0 }); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } + + fn get_timestamp(&self, hash: Hash) -> std::result::Result { + let map = self.header_map.borrow(); + if let Some(h) = map.get(&hash) { + return Ok(h.timestamp); + } + Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Headers.as_ref(), hash))) + } +} + +impl MemoryHeaderStore { + pub fn insert(&self, header: Arc
) { + self.header_map.borrow_mut().insert(header.hash, header); + } +} From 88c6cc737b9191135589f03d3d2494fc980e15a7 Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Tue, 28 Oct 2025 22:52:47 -0600 Subject: [PATCH 4/8] Add visualization test helpers --- consensus/Cargo.toml | 1 + consensus/src/test_helpers.rs | 105 +++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 0abcd2528c..fb3550ee8a 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -40,6 +40,7 @@ rayon.workspace = true rocksdb.workspace = true secp256k1.workspace = true serde.workspace = true +serde_json.workspace = true smallvec.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/consensus/src/test_helpers.rs b/consensus/src/test_helpers.rs index c765494dd7..03777a48b1 100644 --- a/consensus/src/test_helpers.rs +++ b/consensus/src/test_helpers.rs @@ -1,4 +1,8 @@ +use std::fs::File; +use std::io::Write; + use kaspa_consensus_core::{ + BlockHashSet, block::Block, header::Header, subnets::SubnetworkId, @@ -159,4 +163,103 @@ pub fn generate_random_transaction_outpoint(rng: &mut SmallRng) -> TransactionOu TransactionOutpoint::new(generate_random_hash(rng), rng.r#gen()) } -//TODO: create `assert_eq_!()` helper macros in `consensus::test_helpers` +//TODO: create `assert_eq_!()` helper macros in `consensus::test_helpers` +/// Utility to output a JSON representation of a DAG +pub fn dag_to_json(genesis: u64, blocks: &[(u64, Vec)]) -> serde_json::Value { + let mut dag_data = serde_json::Map::new(); + dag_data.insert("genesis".to_string(), serde_json::Value::Number(genesis.into())); + + let blocks_array: Vec = blocks + .iter() + .map(|(block, parents)| { + let mut block_obj = serde_json::Map::new(); + block_obj.insert("id".to_string(), serde_json::Value::Number((*block).into())); + block_obj.insert( + "parents".to_string(), + serde_json::Value::Array(parents.iter().map(|p| serde_json::Value::Number((*p).into())).collect()), + ); + serde_json::Value::Object(block_obj) + }) + .collect(); + + dag_data.insert("blocks".to_string(), serde_json::Value::Array(blocks_array)); + serde_json::Value::Object(dag_data) +} + +/// Utility to output a DOT/Graphviz representation of a DAG +pub fn dag_to_dot(genesis: u64, blocks: &Vec<(u64, Vec)>) -> String { + let mut dot = String::from("digraph DAG {\n"); + dot.push_str(&format!(" {} [shape=doublecircle];\n", genesis)); + for (block, parents) in blocks { + dot.push_str(&format!(" {} [shape=circle];\n", block)); + for parent in parents { + dot.push_str(&format!(" {} -> {};\n", block, parent)); + } + } + dot.push_str("}\n"); + dot +} + +pub fn generate_dot_with_chain( + blocks: &[(u64, Vec)], + chain_nodes: &BlockHashSet, + reds: BlockHashSet, + base_name: &str, +) -> std::io::Result<()> { + let dot_filename = format!("{}.dot", base_name); + let mut dot_file = File::create(&dot_filename)?; + + // Write DOT header + writeln!(dot_file, "digraph DAG {{")?; + writeln!(dot_file, " // Graph settings")?; + writeln!(dot_file, " rankdir=TB;")?; + writeln!(dot_file, " node [fontname=\"Arial\", fontsize=10];")?; + writeln!(dot_file, " edge [fontname=\"Arial\", fontsize=8];")?; + writeln!(dot_file)?; + + // Write node definitions + for (block_id, _) in blocks { + let block_hash = Hash::from_u64_word(*block_id); + if chain_nodes.contains(&block_hash) { + // Chain nodes get double circle + writeln!( + dot_file, + " {} [shape=doublecircle, color=blue, style=filled, fillcolor=lightsteelblue, penwidth=2];", + block_id + )?; + } else if reds.contains(&block_hash) { + // Non-chain nodes get regular circle + writeln!(dot_file, " {} [shape=circle, style=filled, fillcolor=lightcoral];", block_id)?; + } else { + // Non-chain nodes get regular circle + writeln!(dot_file, " {} [shape=circle, style=filled, fillcolor=lightskyblue];", block_id)?; + } + } + + writeln!(dot_file)?; + + // Write edge definitions + for (block_id, parent_ids) in blocks { + let from_node = Hash::from_u64_word(*block_id); + + if parent_ids.is_empty() { + continue; + } + for &parent_id in parent_ids { + let to_node = Hash::from_u64_word(parent_id); + + if chain_nodes.contains(&from_node) && chain_nodes.contains(&to_node) { + // Chain edges get bold red + writeln!(dot_file, " {} -> {} [color=blue, penwidth=3, style=bold];", block_id, parent_id)?; + } else { + // Regular edges get gray dashed + writeln!(dot_file, " {} -> {} [color=gray, style=dashed];", block_id, parent_id)?; + } + } + } + + writeln!(dot_file, "}}")?; + + println!("Generated DOT file: {}", dot_filename); + Ok(()) +} From 18f0227914424b02554b957fe9a82afa6daeea5a Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Tue, 28 Oct 2025 22:42:46 -0600 Subject: [PATCH 5/8] Make Memory reachability and relations stores clone correctly Now cloning these stores would ensure that the underlying map is the same. Also adds is_chain_ancestor_of_all to ReachabilityService --- consensus/src/model/services/reachability.rs | 11 +++ consensus/src/model/stores/reachability.rs | 88 +++++++++++++------- consensus/src/model/stores/relations.rs | 43 ++++++---- 3 files changed, 93 insertions(+), 49 deletions(-) diff --git a/consensus/src/model/services/reachability.rs b/consensus/src/model/services/reachability.rs index 3c4f6864ff..78c0ceba20 100644 --- a/consensus/src/model/services/reachability.rs +++ b/consensus/src/model/services/reachability.rs @@ -13,6 +13,8 @@ pub trait ReachabilityService { /// Note that we use the graph theory convention here which defines that a block is also an ancestor of itself. fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool; + fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool; + /// Result version of [`Self::is_dag_ancestor_of`] (avoids unwrapping internally) fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result; @@ -46,6 +48,10 @@ impl ReachabilityService for T { inquirer::is_chain_ancestor_of(self, this, queried).unwrap() } + fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { + queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(self, this, hash).unwrap()) + } + fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result { inquirer::is_dag_ancestor_of(self, this, queried) } @@ -102,6 +108,11 @@ impl ReachabilityService for MTReachability inquirer::is_chain_ancestor_of(read_guard.deref(), this, queried).unwrap() } + fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { + let read_guard = self.store.read(); + queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(read_guard.deref(), this, hash).unwrap()) + } + fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result { let read_guard = self.store.read(); inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried) diff --git a/consensus/src/model/stores/reachability.rs b/consensus/src/model/stores/reachability.rs index 10083a2f87..ffe173c4d9 100644 --- a/consensus/src/model/stores/reachability.rs +++ b/consensus/src/model/stores/reachability.rs @@ -13,7 +13,7 @@ use kaspa_hashes::Hash; use itertools::Itertools; use kaspa_utils::mem_size::MemSizeEstimator; -use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard}; +use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; use rocksdb::WriteBatch; use serde::{Deserialize, Serialize}; use std::{ @@ -642,8 +642,9 @@ impl MemoryReachabilityData { } } +#[derive(Clone)] pub struct MemoryReachabilityStore { - map: BlockHashMap, + map: Arc>>, reindex_root: Option, } @@ -655,21 +656,7 @@ impl Default for MemoryReachabilityStore { impl MemoryReachabilityStore { pub fn new() -> Self { - Self { map: BlockHashMap::new(), reindex_root: None } - } - - fn get_data_mut(&mut self, hash: Hash) -> Result<&mut MemoryReachabilityData, StoreError> { - match self.map.get_mut(&hash) { - Some(data) => Ok(data), - None => Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash))), - } - } - - fn get_data(&self, hash: Hash) -> Result<&MemoryReachabilityData, StoreError> { - match self.map.get(&hash) { - Some(data) => Ok(data), - None => Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash))), - } + Self { map: Arc::new(RwLock::new(BlockHashMap::new())), reindex_root: None } } } @@ -681,7 +668,8 @@ impl ReachabilityStore for MemoryReachabilityStore { } fn insert(&mut self, hash: Hash, parent: Hash, interval: Interval, height: u64) -> Result<(), StoreError> { - if let Vacant(e) = self.map.entry(hash) { + let mut map = self.map.write(); + if let Vacant(e) = map.entry(hash) { e.insert(MemoryReachabilityData::new(parent, interval, height)); Ok(()) } else { @@ -690,25 +678,37 @@ impl ReachabilityStore for MemoryReachabilityStore { } fn set_interval(&mut self, hash: Hash, interval: Interval) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; data.interval = interval; Ok(()) } fn append_child(&mut self, hash: Hash, child: Hash) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; Arc::make_mut(&mut data.children).push(child); Ok(()) } fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; Arc::make_mut(&mut data.future_covering_set).insert(insertion_index, fci); Ok(()) } fn set_parent(&mut self, hash: Hash, new_parent: Hash) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; data.parent = new_parent; Ok(()) } @@ -720,7 +720,10 @@ impl ReachabilityStore for MemoryReachabilityStore { replaced_index: usize, replace_with: &[Hash], ) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; let removed_hash = Arc::make_mut(&mut data.children).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); debug_assert_eq!(replaced_hash, removed_hash.exactly_one().unwrap()); Ok(()) @@ -733,7 +736,10 @@ impl ReachabilityStore for MemoryReachabilityStore { replaced_index: usize, replace_with: &[Hash], ) -> Result<(), StoreError> { - let data = self.get_data_mut(hash)?; + let mut map = self.map.write(); + let data = map + .get_mut(&hash) + .ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; let removed_hash = Arc::make_mut(&mut data.future_covering_set).splice(replaced_index..replaced_index + 1, replace_with.iter().copied()); debug_assert_eq!(replaced_hash, removed_hash.exactly_one().unwrap()); @@ -741,12 +747,16 @@ impl ReachabilityStore for MemoryReachabilityStore { } fn delete(&mut self, hash: Hash) -> Result<(), StoreError> { - self.map.remove(&hash); + let mut map = self.map.write(); + map.remove(&hash); Ok(()) } fn get_height(&self, hash: Hash) -> Result { - Ok(self.get_data(hash)?.height) + let map = self.map.read(); + let data = + map.get(&hash).ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; + Ok(data.height) } fn set_reindex_root(&mut self, root: Hash) -> Result<(), StoreError> { @@ -764,27 +774,41 @@ impl ReachabilityStore for MemoryReachabilityStore { impl ReachabilityStoreReader for MemoryReachabilityStore { fn has(&self, hash: Hash) -> Result { - Ok(self.map.contains_key(&hash)) + let map = self.map.read(); + Ok(map.contains_key(&hash)) } fn get_interval(&self, hash: Hash) -> Result { - Ok(self.get_data(hash)?.interval) + let map = self.map.read(); + let data = + map.get(&hash).ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; + Ok(data.interval) } fn get_parent(&self, hash: Hash) -> Result { - Ok(self.get_data(hash)?.parent) + let map = self.map.read(); + let data = + map.get(&hash).ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; + Ok(data.parent) } fn get_children(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.get_data(hash)?.children)) + let map = self.map.read(); + let data = + map.get(&hash).ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; + Ok(Arc::clone(&data.children)) } fn get_future_covering_set(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.get_data(hash)?.future_covering_set)) + let map = self.map.read(); + let data = + map.get(&hash).ok_or_else(|| StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::Reachability.as_ref(), hash)))?; + Ok(Arc::clone(&data.future_covering_set)) } fn count(&self) -> Result { - Ok(self.map.len()) + let map = self.map.read(); + Ok(map.len()) } } diff --git a/consensus/src/model/stores/relations.rs b/consensus/src/model/stores/relations.rs index 456badb35e..493baad534 100644 --- a/consensus/src/model/stores/relations.rs +++ b/consensus/src/model/stores/relations.rs @@ -1,6 +1,6 @@ use itertools::Itertools; use kaspa_consensus_core::BlockHashSet; -use kaspa_consensus_core::{BlockHashMap, BlockHasher, BlockLevel, blockhash::BlockHashes}; +use kaspa_consensus_core::{BlockHashMap, BlockHasher, BlockLevel, HashMapCustomHasher, blockhash::BlockHashes}; use kaspa_database::prelude::{BatchDbWriter, CachePolicy, DbWriter}; use kaspa_database::prelude::{CachedDbAccess, DbKey, DirectDbWriter}; use kaspa_database::prelude::{DB, StoreResult}; @@ -12,7 +12,7 @@ use rocksdb::WriteBatch; use std::collections::HashSet; use std::collections::hash_map::Entry; use std::iter::once; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use super::children::{ChildrenStore, ChildrenStoreReader, DbChildrenStore}; @@ -348,32 +348,34 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct MemoryRelationsStore { - parents_map: BlockHashMap, - children_map: BlockHashMap, + parents_map: Arc>>, + children_map: Arc>>, } impl MemoryRelationsStore { pub fn new() -> Self { - Default::default() + Self { parents_map: Arc::new(RwLock::new(BlockHashMap::new())), children_map: Arc::new(RwLock::new(BlockHashMap::new())) } } } impl ChildrenStore for MemoryRelationsStore { fn insert_child(&mut self, _writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> { - let mut children = match self.children_map.get(&parent) { + let mut children_map = self.children_map.write().unwrap(); + let mut children = match children_map.get(&parent) { Some(children) => children.iter().copied().collect_vec(), None => vec![], }; children.push(child); - self.children_map.insert(parent, children.into()); + children_map.insert(parent, children.into()); Ok(()) } fn delete_child(&mut self, _writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> { - let mut children = match self.children_map.get(&parent) { + let mut children_map = self.children_map.write().unwrap(); + let mut children = match children_map.get(&parent) { Some(children) => children.iter().copied().collect_vec(), None => vec![], }; @@ -383,14 +385,15 @@ impl ChildrenStore for MemoryRelationsStore { }; children.remove(to_remove_idx); - self.children_map.insert(parent, children.into()); + children_map.insert(parent, children.into()); Ok(()) } } impl RelationsStoreReader for MemoryRelationsStore { fn get_parents(&self, hash: Hash) -> Result { - match self.parents_map.get(&hash) { + let parents_map = self.parents_map.read().unwrap(); + match parents_map.get(&hash) { Some(parents) => Ok(BlockHashes::clone(parents)), None => Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::RelationsParents.as_ref(), hash))), } @@ -400,7 +403,8 @@ impl RelationsStoreReader for MemoryRelationsStore { if !self.has(hash)? { Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::RelationsChildren.as_ref(), hash))) } else { - match self.children_map.get(&hash) { + let children_map = self.children_map.read().unwrap(); + match children_map.get(&hash) { Some(children) => Ok(BlockHashSet::from_iter(children.iter().copied()).into()), None => Ok(Default::default()), } @@ -408,11 +412,13 @@ impl RelationsStoreReader for MemoryRelationsStore { } fn has(&self, hash: Hash) -> Result { - Ok(self.parents_map.contains_key(&hash)) + let parents_map = self.parents_map.read().unwrap(); + Ok(parents_map.contains_key(&hash)) } fn counts(&self) -> Result<(usize, usize), StoreError> { - let count = self.parents_map.len(); + let parents_map = self.parents_map.read().unwrap(); + let count = parents_map.len(); Ok((count, count)) } } @@ -425,13 +431,16 @@ impl RelationsStore for MemoryRelationsStore { } fn set_parents(&mut self, _writer: impl DbWriter, hash: Hash, parents: BlockHashes) -> Result<(), StoreError> { - self.parents_map.insert(hash, parents); + let mut parents_map = self.parents_map.write().unwrap(); + parents_map.insert(hash, parents); Ok(()) } fn delete_entries(&mut self, _writer: impl DbWriter, hash: Hash) -> Result<(), StoreError> { - self.parents_map.remove(&hash); - self.children_map.remove(&hash); + let mut parents_map = self.parents_map.write().unwrap(); + let mut children_map = self.children_map.write().unwrap(); + parents_map.remove(&hash); + children_map.remove(&hash); Ok(()) } } From 9fdeb3b81cd6c25ba29ebb391e6d81a9fdb68760 Mon Sep 17 00:00:00 2001 From: Maxim <59533214+biryukovmaxim@users.noreply.github.com> Date: Tue, 20 Jan 2026 21:11:58 +0300 Subject: [PATCH 6/8] Add reachability try helpers Co-authored-by: coderofstuff <114628839+coderofstuff@users.noreply.github.com> --- consensus/src/model/services/reachability.rs | 59 ++++++++----------- .../pipeline/pruning_processor/processor.rs | 4 +- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/consensus/src/model/services/reachability.rs b/consensus/src/model/services/reachability.rs index 78c0ceba20..581bf2e96f 100644 --- a/consensus/src/model/services/reachability.rs +++ b/consensus/src/model/services/reachability.rs @@ -11,26 +11,35 @@ use kaspa_hashes::Hash; pub trait ReachabilityService { /// Checks if `this` block is a chain ancestor of `queried` block (i.e., `this ∈ chain(queried) ∪ {queried}`). /// Note that we use the graph theory convention here which defines that a block is also an ancestor of itself. - fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool; + fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool { + self.try_is_chain_ancestor_of(this, queried).unwrap() + } + + /// Result version of [`Self::is_chain_ancestor_of`] (avoids unwrapping internally) + fn try_is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> Result; fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool; /// Result version of [`Self::is_dag_ancestor_of`] (avoids unwrapping internally) - fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result; + fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result; /// Returns true if `this` is a DAG ancestor of `queried` (i.e., `queried ∈ future(this) ∪ {this}`). /// Note: this method will return true if `this == queried`. /// The complexity of this method is `O(log(|future_covering_set(this)|))` - fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool; + fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool { + self.try_is_dag_ancestor_of(this, queried).unwrap() + } /// Checks if `this` is DAG ancestor of any of the blocks in `queried`. See [`Self::is_dag_ancestor_of`] as well. fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator) -> bool; /// Checks if any of the blocks in `list` is DAG ancestor of `queried`. See [`Self::is_dag_ancestor_of`] as well. - fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool; + fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool { + self.try_is_any_dag_ancestor(list, queried).unwrap() + } /// Result version of [`Self::is_any_dag_ancestor`] (avoids unwrapping internally) - fn is_any_dag_ancestor_result(&self, list: &mut impl Iterator, queried: Hash) -> Result; + fn try_is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> Result; /// Finds the tree child of `ancestor` which is also a chain ancestor of `descendant`. /// (A "tree child of X" is a block which X is its chain parent) @@ -44,31 +53,23 @@ pub trait ReachabilityService { } impl ReachabilityService for T { - fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool { - inquirer::is_chain_ancestor_of(self, this, queried).unwrap() + fn try_is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> Result { + inquirer::is_chain_ancestor_of(self, this, queried) } fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(self, this, hash).unwrap()) } - fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result { + fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result { inquirer::is_dag_ancestor_of(self, this, queried) } - fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool { - inquirer::is_dag_ancestor_of(self, this, queried).unwrap() - } - fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator) -> bool { queried.any(|hash| inquirer::is_dag_ancestor_of(self, this, hash).unwrap()) } - fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool { - list.any(|hash| inquirer::is_dag_ancestor_of(self, hash, queried).unwrap()) - } - - fn is_any_dag_ancestor_result(&self, list: &mut impl Iterator, queried: Hash) -> Result { + fn try_is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> Result { for hash in list { if inquirer::is_dag_ancestor_of(self, hash, queried)? { return Ok(true); @@ -103,9 +104,9 @@ impl MTReachabilityService { } impl ReachabilityService for MTReachabilityService { - fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool { + fn try_is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> Result { let read_guard = self.store.read(); - inquirer::is_chain_ancestor_of(read_guard.deref(), this, queried).unwrap() + inquirer::is_chain_ancestor_of(read_guard.deref(), this, queried) } fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { @@ -113,30 +114,20 @@ impl ReachabilityService for MTReachability queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(read_guard.deref(), this, hash).unwrap()) } - fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result { + fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result { let read_guard = self.store.read(); inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried) } - fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool { - let read_guard = self.store.read(); - inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried).unwrap() - } - - fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool { - let read_guard = self.store.read(); - list.any(|hash| inquirer::is_dag_ancestor_of(read_guard.deref(), hash, queried).unwrap()) - } - - fn is_any_dag_ancestor_result(&self, list: &mut impl Iterator, queried: Hash) -> Result { - self.store.read().is_any_dag_ancestor_result(list, queried) - } - fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator) -> bool { let read_guard = self.store.read(); queried.any(|hash| inquirer::is_dag_ancestor_of(read_guard.deref(), this, hash).unwrap()) } + fn try_is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> Result { + self.store.read().try_is_any_dag_ancestor(list, queried) + } + fn get_next_chain_ancestor(&self, descendant: Hash, ancestor: Hash) -> Hash { let read_guard = self.store.read(); inquirer::get_next_chain_ancestor(read_guard.deref(), descendant, ancestor).unwrap() diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index fcd51ab8d2..661a37ae9f 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -407,7 +407,7 @@ impl PruningProcessor { .read() .iter() .copied() - .filter(|&h| !reachability_read.is_dag_ancestor_of_result(new_pruning_point, h).unwrap()) + .filter(|&h| !reachability_read.try_is_dag_ancestor_of(new_pruning_point, h).unwrap()) .collect_vec(); tips_write.prune_tips_with_writer(BatchDbWriter::new(&mut batch), &pruned_tips).unwrap(); if !pruned_tips.is_empty() { @@ -444,7 +444,7 @@ impl PruningProcessor { let (mut counter, mut traversed) = (0, 0); info!("Header and Block pruning: starting traversal from: {} (genesis: {})", queue.iter().reusable_format(", "), genesis); while let Some(current) = queue.pop_front() { - if reachability_read.is_dag_ancestor_of_result(retention_period_root, current).unwrap() { + if reachability_read.try_is_dag_ancestor_of(retention_period_root, current).unwrap() { continue; } traversed += 1; From 6e1bc9e54388ac349f480aed8e725880dfce2e06 Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Mon, 26 Jan 2026 22:09:24 -0700 Subject: [PATCH 7/8] Align is_chain_ancestor_of_all impl to is_chain_ancestor_of --- consensus/src/model/services/reachability.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/consensus/src/model/services/reachability.rs b/consensus/src/model/services/reachability.rs index 581bf2e96f..cdfe07bbb4 100644 --- a/consensus/src/model/services/reachability.rs +++ b/consensus/src/model/services/reachability.rs @@ -15,11 +15,15 @@ pub trait ReachabilityService { self.try_is_chain_ancestor_of(this, queried).unwrap() } + /// Checks if `this` block is a chain ancestor of all blocks in `queried`. + /// See [`Self::is_chain_ancestor_of`] as well. + fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { + queried.iter().all(|&hash| self.try_is_chain_ancestor_of(this, hash).unwrap()) + } + /// Result version of [`Self::is_chain_ancestor_of`] (avoids unwrapping internally) fn try_is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> Result; - fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool; - /// Result version of [`Self::is_dag_ancestor_of`] (avoids unwrapping internally) fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result; @@ -57,10 +61,6 @@ impl ReachabilityService for T { inquirer::is_chain_ancestor_of(self, this, queried) } - fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { - queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(self, this, hash).unwrap()) - } - fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result { inquirer::is_dag_ancestor_of(self, this, queried) } @@ -109,11 +109,6 @@ impl ReachabilityService for MTReachability inquirer::is_chain_ancestor_of(read_guard.deref(), this, queried) } - fn is_chain_ancestor_of_all(&self, this: Hash, queried: &[Hash]) -> bool { - let read_guard = self.store.read(); - queried.iter().all(|&hash| inquirer::is_chain_ancestor_of(read_guard.deref(), this, hash).unwrap()) - } - fn try_is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> Result { let read_guard = self.store.read(); inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried) From 96d31e553ce2fe2dcb1fce6bd8f51f7151bca2a5 Mon Sep 17 00:00:00 2001 From: coderofstuff <114628839+coderofstuff@users.noreply.github.com> Date: Tue, 28 Oct 2025 22:48:09 -0600 Subject: [PATCH 8/8] Implement add_block_with_selected_parent for DagBuilder --- .../src/processes/reachability/tests/mod.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/consensus/src/processes/reachability/tests/mod.rs b/consensus/src/processes/reachability/tests/mod.rs index 95070bc664..faca9d773e 100644 --- a/consensus/src/processes/reachability/tests/mod.rs +++ b/consensus/src/processes/reachability/tests/mod.rs @@ -148,9 +148,25 @@ impl<'a, T: ReachabilityStore + ?Sized, S: RelationsStore + ChildrenStore + ?Siz self } + pub fn add_block_with_selected_parent(&mut self, mut block: DagBlock, selected_parent: Hash) -> &mut Self { + if block.parents.is_empty() { + block.parents.push(ORIGIN); + } + assert!(block.parents.contains(&selected_parent)); + let mergeset = unordered_mergeset_without_selected_parent(self.relations, self.reachability, selected_parent, &block.parents); + add_block(self.reachability, block.hash, selected_parent, &mut mergeset.iter().cloned()).unwrap(); + hint_virtual_selected_parent(self.reachability, block.hash).unwrap(); + self.relations.insert(block.hash, BlockHashes::new(block.parents)).unwrap(); + self + } + pub fn store(&self) -> &&'a mut T { &self.reachability } + + pub fn relations(&self) -> &&'a mut S { + &self.relations + } } /// Validates that relations are consistent and do not contain any dangling hash etc