From a7ae675628e27e75ad47638c5b52862306857f5b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 20 Jan 2026 09:11:11 -0300 Subject: [PATCH 1/6] feat: use phased imports for better performance --- .ongoing/phased-mithril-import.md | 43 +++++ crates/cardano/src/roll/txs.rs | 63 +++++++ crates/core/src/facade.rs | 94 ++++++++++ src/bin/dolos/bootstrap/mithril/archive.rs | 129 ++++++++++++++ .../{mithril.rs => mithril/helpers.rs} | 160 +++--------------- src/bin/dolos/bootstrap/mithril/mod.rs | 139 +++++++++++++++ src/bin/dolos/bootstrap/mithril/state.rs | 60 +++++++ 7 files changed, 550 insertions(+), 138 deletions(-) create mode 100644 .ongoing/phased-mithril-import.md create mode 100644 src/bin/dolos/bootstrap/mithril/archive.rs rename src/bin/dolos/bootstrap/{mithril.rs => mithril/helpers.rs} (50%) create mode 100644 src/bin/dolos/bootstrap/mithril/mod.rs create mode 100644 src/bin/dolos/bootstrap/mithril/state.rs diff --git a/.ongoing/phased-mithril-import.md b/.ongoing/phased-mithril-import.md new file mode 100644 index 00000000..bf27185e --- /dev/null +++ b/.ongoing/phased-mithril-import.md @@ -0,0 +1,43 @@ +# Phased Mithril Import Plan + +## Goal +Split Mithril bootstrap into two passes (state, archive), skip WAL during bootstrap, and make archive import fast and independent of state. + +## Work Completed +- Split Mithril bootstrap into modules: + - `src/bin/dolos/bootstrap/mithril/mod.rs` (facade: CLI args + run) + - `src/bin/dolos/bootstrap/mithril/helpers.rs` (snapshot download + starting point helpers) + - `src/bin/dolos/bootstrap/mithril/state.rs` (state pass) + - `src/bin/dolos/bootstrap/mithril/archive.rs` (archive pass) +- Added state-only import in core: + - `crates/core/src/facade.rs` now exposes `import_blocks_state_only` with state-only batch execution helpers (no archive commit). +- Updated slot tag collection to accept resolved inputs: + - `crates/cardano/src/roll/txs.rs` signature is now `collect_slot_tags_from_block(block, resolved_inputs, tags)`. + - Uses resolved inputs to index input-derived tags (addresses, assets, datums). +- Archive pass uses an in-memory UTxO cache (no disk cache): + - Insert outputs into cache (decoded once). + - Build tags using resolved inputs from cache (skip missing inputs). + - Apply to archive. + - Remove consumed inputs from cache. +- Archive pass no longer resolves inputs from archive on-demand. +- State pass still uses `import_blocks_state_only`. +- CLI flags `--state-only` and `--archive-only` remain in the facade. + +## Key Decisions +- WAL is skipped entirely during bootstrap. +- Archive pass uses only tx logs/tags with in-memory UTxO cache. +- Missing inputs during archive pass are skipped (no error). +- No cache size limits yet (memory-heavy acceptable for now). + +## Files Touched +- `src/bin/dolos/bootstrap/mithril/mod.rs` +- `src/bin/dolos/bootstrap/mithril/helpers.rs` +- `src/bin/dolos/bootstrap/mithril/state.rs` +- `src/bin/dolos/bootstrap/mithril/archive.rs` +- `crates/core/src/facade.rs` +- `crates/cardano/src/roll/txs.rs` + +## Notes +- Compile checks ran successfully after refactor. +- UTxO cache is archive-only and not reused in helpers. +- `helpers.rs` is the preferred naming over `common.rs`. diff --git a/crates/cardano/src/roll/txs.rs b/crates/cardano/src/roll/txs.rs index ca8dfede..ea1b0b6d 100644 --- a/crates/cardano/src/roll/txs.rs +++ b/crates/cardano/src/roll/txs.rs @@ -86,6 +86,69 @@ fn unpack_cert(tags: &mut SlotTags, cert: &MultiEraCert) { } } +pub fn collect_slot_tags_from_block( + block: &MultiEraBlock, + resolved_inputs: &HashMap, + tags: &mut SlotTags, +) -> Result<(), ChainError> { + tags.number = Some(block.number()); + + for tx in block.txs() { + tags.tx_hashes.push(tx.hash().to_vec()); + + for (key, _) in tx.metadata().collect::>() { + tags.metadata.push(key); + } + + for input in tx.consumes() { + let txoref: TxoRef = (&input).into(); + tags.spent_txo.push(txoref.clone().into()); + + if let Some(resolved) = resolved_inputs.get(&txoref) { + resolved.with_dependent(|_, resolved| { + if let Ok(address) = resolved.address() { + unpack_address(tags, &address); + } + + unpack_assets(tags, &resolved.value()); + + if let Some(datum) = resolved.datum() { + unpack_datum(tags, &datum); + } + + Result::<_, ChainError>::Ok(()) + })?; + } + } + + for (_, output) in tx.produces() { + if let Ok(address) = output.address() { + unpack_address(tags, &address); + } + + unpack_assets(tags, &output.value()); + + if let Some(datum) = output.datum() { + unpack_datum(tags, &datum); + } + } + + for datum in tx.plutus_data() { + tags.datums.push(datum.original_hash().to_vec()); + } + + for cert in tx.certs() { + unpack_cert(tags, &cert); + } + + for redeemer in tx.redeemers() { + tags.datums.push(redeemer.data().compute_hash().to_vec()); + } + } + + Ok(()) +} + impl BlockVisitor for TxLogVisitor { fn visit_root( &mut self, diff --git a/crates/core/src/facade.rs b/crates/core/src/facade.rs index 439c944d..6b10f6e1 100644 --- a/crates/core/src/facade.rs +++ b/crates/core/src/facade.rs @@ -36,6 +36,32 @@ fn execute_batch( Ok(batch.last_slot()) } +fn execute_batch_state_only( + chain: &D::Chain, + domain: &D, + batch: &mut WorkBatch, + with_wal: bool, +) -> Result { + batch.load_utxos(domain)?; + + batch.decode_utxos(chain)?; + + // Chain-specific logic + chain.compute_delta::(domain.state(), domain.genesis(), batch)?; + + if with_wal { + batch.commit_wal(domain)?; + } + + batch.load_entities(domain)?; + + batch.apply_entities()?; + + batch.commit_state(domain)?; + + Ok(batch.last_slot()) +} + fn notify_work(domain: &D, work: &WorkUnit) { let WorkUnit::Blocks(batch) = work else { return; @@ -84,6 +110,41 @@ fn execute_work( Ok(()) } +fn execute_work_state_only( + chain: &mut D::Chain, + domain: &D, + work: &mut WorkUnit, + live: bool, +) -> Result<(), DomainError> { + match work { + WorkUnit::Genesis => { + chain.apply_genesis::(domain.state(), domain.genesis())?; + domain.wal().reset_to(&ChainPoint::Origin)?; + } + WorkUnit::EWrap(slot) => { + chain.apply_ewrap::(domain.state(), domain.archive(), domain.genesis(), *slot)?; + } + WorkUnit::EStart(slot) => { + chain.apply_estart::(domain.state(), domain.archive(), domain.genesis(), *slot)?; + } + WorkUnit::Rupd(slot) => { + chain.apply_rupd::(domain.state(), domain.archive(), domain.genesis(), *slot)?; + } + WorkUnit::Blocks(batch) => { + execute_batch_state_only(chain, domain, batch, live)?; + } + WorkUnit::ForcedStop => { + return Err(DomainError::StopEpochReached); + } + }; + + if live { + notify_work(domain, work); + } + + Ok(()) +} + async fn drain_pending_work( chain: &mut D::Chain, domain: &D, @@ -96,6 +157,18 @@ async fn drain_pending_work( Ok(()) } +async fn drain_pending_work_state_only( + chain: &mut D::Chain, + domain: &D, + live: bool, +) -> Result<(), DomainError> { + while let Some(mut work) = chain.pop_work() { + execute_work_state_only(chain, domain, &mut work, live)?; + } + + Ok(()) +} + pub async fn import_blocks( domain: &D, mut raw: Vec, @@ -117,6 +190,27 @@ pub async fn import_blocks( Ok(last) } +pub async fn import_blocks_state_only( + domain: &D, + mut raw: Vec, +) -> Result { + let mut last = 0; + let mut chain = domain.write_chain().await; + + for block in raw.drain(..) { + if !chain.can_receive_block() { + drain_pending_work_state_only(&mut *chain, domain, false).await?; + } + + last = chain.receive_block(block)?; + } + + // one last drain to ensure we're up to date + drain_pending_work_state_only(&mut *chain, domain, false).await?; + + Ok(last) +} + pub async fn roll_forward( domain: &D, block: RawBlock, diff --git a/src/bin/dolos/bootstrap/mithril/archive.rs b/src/bin/dolos/bootstrap/mithril/archive.rs new file mode 100644 index 00000000..30d414d5 --- /dev/null +++ b/src/bin/dolos/bootstrap/mithril/archive.rs @@ -0,0 +1,129 @@ +use miette::{Context, IntoDiagnostic}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; + +use itertools::Itertools; + +use dolos::prelude::*; +use dolos_cardano::{owned::OwnedMultiEraOutput, roll::txs::collect_slot_tags_from_block}; +use pallas::ledger::traverse::MultiEraBlock; + +use crate::feedback::Feedback; +use dolos_core::config::RootConfig; + +use super::helpers::define_archive_starting_point; +use super::Args; + +#[derive(Default)] +struct UtxoCache { + entries: HashMap, +} + +impl UtxoCache { + fn insert_block_outputs(&mut self, block: &MultiEraBlock) -> miette::Result<()> { + for tx in block.txs() { + let tx_hash = tx.hash(); + + for (idx, output) in tx.produces() { + let txo_ref = TxoRef(tx_hash, idx as u32); + let eracbor: EraCbor = output.into(); + let resolved = OwnedMultiEraOutput::decode(Arc::new(eracbor)).into_diagnostic()?; + + self.entries.insert(txo_ref, resolved); + } + } + + Ok(()) + } + + fn remove_block_inputs(&mut self, block: &MultiEraBlock) { + for tx in block.txs() { + for input in tx.consumes() { + let txo_ref: TxoRef = (&input).into(); + self.entries.remove(&txo_ref); + } + } + } +} + +pub(crate) async fn import_hardano_into_archive( + args: &Args, + config: &RootConfig, + immutable_path: &Path, + feedback: &Feedback, + chunk_size: usize, +) -> Result<(), miette::Error> { + let domain = crate::common::setup_domain(config).await?; + + let tip = pallas::storage::hardano::immutable::get_tip(immutable_path) + .map_err(|err| miette::miette!(err.to_string())) + .context("reading immutable db tip")? + .ok_or(miette::miette!("immutable db has no tip"))?; + + let cursor = define_archive_starting_point(args, domain.archive())?; + + let iter = pallas::storage::hardano::immutable::read_blocks_from_point(immutable_path, cursor) + .map_err(|err| miette::miette!(err.to_string())) + .context("reading immutable db tip")?; + + let progress = feedback.slot_progress_bar(); + + progress.set_message("importing immutable db (archive)"); + progress.set_length(tip.slot_or_default()); + + let mut utxos = UtxoCache::default(); + + for batch in iter.chunks(chunk_size).into_iter() { + let batch: Vec<_> = batch + .try_collect() + .into_diagnostic() + .context("reading block data")?; + + let writer = domain + .archive() + .start_writer() + .map_err(|err| miette::miette!(format!("{err:?}"))) + .context("starting archive writer")?; + + let mut last_slot = None; + + for raw in batch { + let raw: Arc = Arc::new(raw); + let block = dolos_cardano::owned::OwnedMultiEraBlock::decode(raw.clone()) + .into_diagnostic() + .context("decoding block")?; + let point = block.point(); + let view = block.view(); + + utxos.insert_block_outputs(view)?; + + let mut tags = SlotTags::default(); + collect_slot_tags_from_block(view, &utxos.entries, &mut tags) + .into_diagnostic() + .context("computing block tags")?; + + writer + .apply(&point, &raw, &tags) + .map_err(|err| miette::miette!(format!("{err:?}"))) + .context("writing archive block")?; + + utxos.remove_block_inputs(view); + + last_slot = Some(point.slot()); + } + + writer + .commit() + .map_err(|err| miette::miette!(format!("{err:?}"))) + .context("committing archive batch")?; + + if let Some(last_slot) = last_slot { + progress.set_position(last_slot); + } + } + + progress.abandon_with_message("immutable db archive import complete"); + + Ok(()) +} diff --git a/src/bin/dolos/bootstrap/mithril.rs b/src/bin/dolos/bootstrap/mithril/helpers.rs similarity index 50% rename from src/bin/dolos/bootstrap/mithril.rs rename to src/bin/dolos/bootstrap/mithril/helpers.rs index 0d3718b6..27145b24 100644 --- a/src/bin/dolos/bootstrap/mithril.rs +++ b/src/bin/dolos/bootstrap/mithril/helpers.rs @@ -1,61 +1,14 @@ -use itertools::Itertools; use miette::{Context, IntoDiagnostic}; use mithril_client::{ClientBuilder, MessageBuilder, MithrilError, MithrilResult}; use std::{path::Path, sync::Arc}; -use tracing::{info, warn}; +use tracing::warn; use dolos::prelude::*; use crate::feedback::Feedback; -use dolos_core::config::{MithrilConfig, RootConfig}; +use dolos_core::config::MithrilConfig; -#[derive(Debug, clap::Args, Clone)] -pub struct Args { - #[arg(long, default_value = "./snapshot")] - download_dir: String, - - /// Skip the bootstrap if there's already data in the stores - #[arg(long, action)] - skip_if_not_empty: bool, - - /// Skip the Mithril certificate validation - #[arg(long, action)] - skip_validation: bool, - - /// Assume the snapshot is already available in the download dir - #[arg(long, action)] - skip_download: bool, - - /// Retain downloaded snapshot instead of deleting it - #[arg(long, action)] - retain_snapshot: bool, - - /// Number of blocks to process in each chunk, more is faster but uses more - /// memory - #[arg(long, default_value = "500")] - chunk_size: usize, - - #[arg(long, action)] - verbose: bool, - - #[arg(long)] - start_from: Option, -} - -impl Default for Args { - fn default() -> Self { - Self { - download_dir: "./snapshot".to_string(), - skip_if_not_empty: Default::default(), - skip_validation: Default::default(), - skip_download: Default::default(), - retain_snapshot: Default::default(), - verbose: Default::default(), - chunk_size: 500, - start_from: None, - } - } -} +use super::Args; struct MithrilFeedback { download_pb: indicatif::ProgressBar, @@ -108,7 +61,7 @@ impl mithril_client::feedback::FeedbackReceiver for MithrilFeedback { } } -async fn fetch_snapshot( +pub(crate) async fn fetch_snapshot( args: &Args, config: &MithrilConfig, feedback: &Feedback, @@ -167,7 +120,7 @@ async fn fetch_snapshot( Ok(()) } -fn define_starting_point( +pub(crate) fn define_starting_point( args: &Args, state: &dolos_redb3::state::StateStore, ) -> Result { @@ -187,97 +140,28 @@ fn define_starting_point( } } -async fn import_hardano_into_domain( +pub(crate) fn define_archive_starting_point( args: &Args, - config: &RootConfig, - immutable_path: &Path, - feedback: &Feedback, - chunk_size: usize, -) -> Result<(), miette::Error> { - let domain = crate::common::setup_domain(config).await?; - - let tip = pallas::storage::hardano::immutable::get_tip(immutable_path) - .map_err(|err| miette::miette!(err.to_string())) - .context("reading immutable db tip")? - .ok_or(miette::miette!("immutable db has no tip"))?; - - let cursor = define_starting_point(args, domain.state())?; - - let iter = pallas::storage::hardano::immutable::read_blocks_from_point(immutable_path, cursor) - .map_err(|err| miette::miette!(err.to_string())) - .context("reading immutable db tip")?; - - let progress = feedback.slot_progress_bar(); - - progress.set_message("importing immutable db"); - progress.set_length(tip.slot_or_default()); - - for batch in iter.chunks(chunk_size).into_iter() { - let batch: Vec<_> = batch - .try_collect() - .into_diagnostic() - .context("reading block data")?; - - // we need to wrap them on a ref counter since bytes are going to be shared - // around throughout the pipeline - let batch: Vec<_> = batch.into_iter().map(Arc::new).collect(); - - let last = dolos_core::facade::import_blocks(&domain, batch) - .await - .map_err(|e| miette::miette!(e.to_string()))?; - - progress.set_position(last); - } - - progress.abandon_with_message("immutable db import complete"); - - Ok(()) -} - -#[tokio::main] -pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> { - if args.verbose { - crate::common::setup_tracing(&config.logging)?; - } - - let mithril = config - .mithril - .as_ref() - .ok_or(miette::miette!("missing mithril config"))?; - - let target_directory = Path::new(&args.download_dir); - - if !target_directory.exists() { - std::fs::create_dir_all(target_directory) - .map_err(|err| miette::miette!(err.to_string())) - .context(format!( - "Failed to create directory: {}", - target_directory.display() - ))?; - } - - if !args.skip_download { - fetch_snapshot(args, mithril, feedback) - .await - .map_err(|err| miette::miette!(err.to_string())) - .context("fetching and validating mithril snapshot")?; - } else { - warn!("skipping download, assuming download dir has snapshot and it's validated") + archive: &impl ArchiveStore, +) -> Result { + if let Some(point) = &args.start_from { + return Ok(point.clone().try_into().unwrap()); } - let immutable_path = Path::new(&args.download_dir).join("immutable"); + let tip = archive + .get_tip() + .into_diagnostic() + .context("reading archive tip")?; - import_hardano_into_domain(args, config, &immutable_path, feedback, args.chunk_size).await?; + let Some((slot, raw)) = tip else { + return Ok(pallas::network::miniprotocols::Point::Origin); + }; - if !args.retain_snapshot { - info!("deleting downloaded snapshot"); + let block = dolos_cardano::owned::OwnedMultiEraBlock::decode(Arc::new(raw)) + .into_diagnostic() + .context("decoding archive tip")?; - std::fs::remove_dir_all(Path::new(&args.download_dir)) - .into_diagnostic() - .context("removing downloaded snapshot")?; - } + let point = ChainPoint::Specific(slot, block.hash()); - println!("bootstrap complete, run `dolos daemon` to start the node"); - - Ok(()) + Ok(point.try_into().unwrap()) } diff --git a/src/bin/dolos/bootstrap/mithril/mod.rs b/src/bin/dolos/bootstrap/mithril/mod.rs new file mode 100644 index 00000000..1231c5fb --- /dev/null +++ b/src/bin/dolos/bootstrap/mithril/mod.rs @@ -0,0 +1,139 @@ +use miette::{Context, IntoDiagnostic}; +use std::path::Path; +use tracing::{info, warn}; + +use dolos::prelude::*; + +use crate::feedback::Feedback; +use dolos_core::config::RootConfig; + +mod archive; +mod helpers; +mod state; + +#[derive(Debug, clap::Args, Clone)] +pub struct Args { + #[arg(long, default_value = "./snapshot")] + download_dir: String, + + /// Skip the bootstrap if there's already data in the stores + #[arg(long, action)] + skip_if_not_empty: bool, + + /// Skip the Mithril certificate validation + #[arg(long, action)] + skip_validation: bool, + + /// Assume the snapshot is already available in the download dir + #[arg(long, action)] + skip_download: bool, + + /// Retain downloaded snapshot instead of deleting it + #[arg(long, action)] + retain_snapshot: bool, + + /// Number of blocks to process in each chunk, more is faster but uses more + /// memory + #[arg(long, default_value = "500")] + chunk_size: usize, + + /// Only process the state pass + #[arg(long, action)] + state_only: bool, + + /// Only process the archive pass + #[arg(long, action)] + archive_only: bool, + + #[arg(long, action)] + verbose: bool, + + #[arg(long)] + start_from: Option, +} + +impl Default for Args { + fn default() -> Self { + Self { + download_dir: "./snapshot".to_string(), + skip_if_not_empty: Default::default(), + skip_validation: Default::default(), + skip_download: Default::default(), + retain_snapshot: Default::default(), + verbose: Default::default(), + chunk_size: 500, + state_only: Default::default(), + archive_only: Default::default(), + start_from: None, + } + } +} + +#[tokio::main] +pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> { + if args.verbose { + crate::common::setup_tracing(&config.logging)?; + } + + let mithril = config + .mithril + .as_ref() + .ok_or(miette::miette!("missing mithril config"))?; + + let target_directory = Path::new(&args.download_dir); + + if !target_directory.exists() { + std::fs::create_dir_all(target_directory) + .map_err(|err| miette::miette!(err.to_string())) + .context(format!( + "Failed to create directory: {}", + target_directory.display() + ))?; + } + + if !args.skip_download { + helpers::fetch_snapshot(args, mithril, feedback) + .await + .map_err(|err| miette::miette!(err.to_string())) + .context("fetching and validating mithril snapshot")?; + } else { + warn!("skipping download, assuming download dir has snapshot and it's validated") + } + + if args.state_only && args.archive_only { + miette::bail!("cannot use --state-only and --archive-only together"); + } + + let run_state = !args.archive_only; + let run_archive = !args.state_only; + + let immutable_path = Path::new(&args.download_dir).join("immutable"); + + if run_state { + state::import_hardano_into_state(args, config, &immutable_path, feedback, args.chunk_size) + .await?; + } + + if run_archive { + archive::import_hardano_into_archive( + args, + config, + &immutable_path, + feedback, + args.chunk_size, + ) + .await?; + } + + if !args.retain_snapshot { + info!("deleting downloaded snapshot"); + + std::fs::remove_dir_all(Path::new(&args.download_dir)) + .into_diagnostic() + .context("removing downloaded snapshot")?; + } + + println!("bootstrap complete, run `dolos daemon` to start the node"); + + Ok(()) +} diff --git a/src/bin/dolos/bootstrap/mithril/state.rs b/src/bin/dolos/bootstrap/mithril/state.rs new file mode 100644 index 00000000..9daf2678 --- /dev/null +++ b/src/bin/dolos/bootstrap/mithril/state.rs @@ -0,0 +1,60 @@ +use miette::{Context, IntoDiagnostic}; +use std::path::Path; +use std::sync::Arc; + +use itertools::Itertools; + +use dolos::prelude::*; + +use crate::feedback::Feedback; +use dolos_core::config::RootConfig; + +use super::helpers::define_starting_point; +use super::Args; + +pub(crate) async fn import_hardano_into_state( + args: &Args, + config: &RootConfig, + immutable_path: &Path, + feedback: &Feedback, + chunk_size: usize, +) -> Result<(), miette::Error> { + let domain = crate::common::setup_domain(config).await?; + + let tip = pallas::storage::hardano::immutable::get_tip(immutable_path) + .map_err(|err| miette::miette!(err.to_string())) + .context("reading immutable db tip")? + .ok_or(miette::miette!("immutable db has no tip"))?; + + let cursor = define_starting_point(args, domain.state())?; + + let iter = pallas::storage::hardano::immutable::read_blocks_from_point(immutable_path, cursor) + .map_err(|err| miette::miette!(err.to_string())) + .context("reading immutable db tip")?; + + let progress = feedback.slot_progress_bar(); + + progress.set_message("importing immutable db (state)"); + progress.set_length(tip.slot_or_default()); + + for batch in iter.chunks(chunk_size).into_iter() { + let batch: Vec<_> = batch + .try_collect() + .into_diagnostic() + .context("reading block data")?; + + // we need to wrap them on a ref counter since bytes are going to be shared + // around throughout the pipeline + let batch: Vec<_> = batch.into_iter().map(Arc::new).collect(); + + let last = dolos_core::facade::import_blocks_state_only(&domain, batch) + .await + .map_err(|e| miette::miette!(e.to_string()))?; + + progress.set_position(last); + } + + progress.abandon_with_message("immutable db state import complete"); + + Ok(()) +} From f299162ed832d804953835f31218ab0297bb6231 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 20 Jan 2026 12:26:59 -0300 Subject: [PATCH 2/6] add todo to track tech-debt --- crates/cardano/src/roll/txs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/cardano/src/roll/txs.rs b/crates/cardano/src/roll/txs.rs index ea1b0b6d..68c0735e 100644 --- a/crates/cardano/src/roll/txs.rs +++ b/crates/cardano/src/roll/txs.rs @@ -86,7 +86,8 @@ fn unpack_cert(tags: &mut SlotTags, cert: &MultiEraCert) { } } -pub fn collect_slot_tags_from_block( +// TODO: this public method breaks all abstrations but it's an easy way to generate archive indexes in an isolated way as needed for the Mithril import process. +pub fn _hack_collect_slot_tags_from_block( block: &MultiEraBlock, resolved_inputs: &HashMap, tags: &mut SlotTags, From 1de70b17a2206aca7827f4c4d85ea7325caed27b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 21 Jan 2026 01:32:18 -0300 Subject: [PATCH 3/6] fix renamed hack method --- src/bin/dolos/bootstrap/mithril/archive.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bin/dolos/bootstrap/mithril/archive.rs b/src/bin/dolos/bootstrap/mithril/archive.rs index 30d414d5..84f469f6 100644 --- a/src/bin/dolos/bootstrap/mithril/archive.rs +++ b/src/bin/dolos/bootstrap/mithril/archive.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use itertools::Itertools; use dolos::prelude::*; -use dolos_cardano::{owned::OwnedMultiEraOutput, roll::txs::collect_slot_tags_from_block}; +use dolos_cardano::{owned::OwnedMultiEraOutput, roll::txs::_hack_collect_slot_tags_from_block}; use pallas::ledger::traverse::MultiEraBlock; use crate::feedback::Feedback; @@ -99,7 +99,7 @@ pub(crate) async fn import_hardano_into_archive( utxos.insert_block_outputs(view)?; let mut tags = SlotTags::default(); - collect_slot_tags_from_block(view, &utxos.entries, &mut tags) + _hack_collect_slot_tags_from_block(view, &utxos.entries, &mut tags) .into_diagnostic() .context("computing block tags")?; From 69676a5dee2dba17074ab218da0462107e857b7f Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 21 Jan 2026 11:18:45 -0300 Subject: [PATCH 4/6] replace hashmap for redb in-memory --- crates/cardano/src/roll/txs.rs | 10 +- src/bin/dolos/bootstrap/mithril/archive.rs | 55 +++------ src/bin/dolos/bootstrap/mithril/mod.rs | 13 +-- src/bin/dolos/bootstrap/mithril/state.rs | 2 +- src/bin/dolos/bootstrap/mithril/utxo_cache.rs | 105 ++++++++++++++++++ 5 files changed, 133 insertions(+), 52 deletions(-) create mode 100644 src/bin/dolos/bootstrap/mithril/utxo_cache.rs diff --git a/crates/cardano/src/roll/txs.rs b/crates/cardano/src/roll/txs.rs index 68c0735e..c4a65206 100644 --- a/crates/cardano/src/roll/txs.rs +++ b/crates/cardano/src/roll/txs.rs @@ -87,7 +87,15 @@ fn unpack_cert(tags: &mut SlotTags, cert: &MultiEraCert) { } // TODO: this public method breaks all abstrations but it's an easy way to generate archive indexes in an isolated way as needed for the Mithril import process. -pub fn _hack_collect_slot_tags_from_block( +pub fn collect_slot_tags_from_block( + block: &MultiEraBlock, + resolved_inputs: &HashMap, + tags: &mut SlotTags, +) -> Result<(), ChainError> { + _hack_collect_slot_tags_from_block(block, resolved_inputs, tags) +} + +fn _hack_collect_slot_tags_from_block( block: &MultiEraBlock, resolved_inputs: &HashMap, tags: &mut SlotTags, diff --git a/src/bin/dolos/bootstrap/mithril/archive.rs b/src/bin/dolos/bootstrap/mithril/archive.rs index 84f469f6..a69f80c0 100644 --- a/src/bin/dolos/bootstrap/mithril/archive.rs +++ b/src/bin/dolos/bootstrap/mithril/archive.rs @@ -1,53 +1,20 @@ use miette::{Context, IntoDiagnostic}; -use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use itertools::Itertools; use dolos::prelude::*; -use dolos_cardano::{owned::OwnedMultiEraOutput, roll::txs::_hack_collect_slot_tags_from_block}; -use pallas::ledger::traverse::MultiEraBlock; +use dolos_cardano::roll::txs::collect_slot_tags_from_block; use crate::feedback::Feedback; use dolos_core::config::RootConfig; use super::helpers::define_archive_starting_point; +use super::utxo_cache::UtxoCache; use super::Args; -#[derive(Default)] -struct UtxoCache { - entries: HashMap, -} - -impl UtxoCache { - fn insert_block_outputs(&mut self, block: &MultiEraBlock) -> miette::Result<()> { - for tx in block.txs() { - let tx_hash = tx.hash(); - - for (idx, output) in tx.produces() { - let txo_ref = TxoRef(tx_hash, idx as u32); - let eracbor: EraCbor = output.into(); - let resolved = OwnedMultiEraOutput::decode(Arc::new(eracbor)).into_diagnostic()?; - - self.entries.insert(txo_ref, resolved); - } - } - - Ok(()) - } - - fn remove_block_inputs(&mut self, block: &MultiEraBlock) { - for tx in block.txs() { - for input in tx.consumes() { - let txo_ref: TxoRef = (&input).into(); - self.entries.remove(&txo_ref); - } - } - } -} - -pub(crate) async fn import_hardano_into_archive( +pub(crate) async fn import_hardano( args: &Args, config: &RootConfig, immutable_path: &Path, @@ -72,7 +39,7 @@ pub(crate) async fn import_hardano_into_archive( progress.set_message("importing immutable db (archive)"); progress.set_length(tip.slot_or_default()); - let mut utxos = UtxoCache::default(); + let utxos = UtxoCache::in_memory().context("initializing in-memory utxo cache")?; for batch in iter.chunks(chunk_size).into_iter() { let batch: Vec<_> = batch @@ -96,10 +63,16 @@ pub(crate) async fn import_hardano_into_archive( let point = block.point(); let view = block.view(); - utxos.insert_block_outputs(view)?; + utxos + .insert_block_outputs(view) + .context("caching block outputs")?; + + let resolved_inputs = utxos + .resolve_block_inputs(view) + .context("resolving block inputs")?; let mut tags = SlotTags::default(); - _hack_collect_slot_tags_from_block(view, &utxos.entries, &mut tags) + collect_slot_tags_from_block(view, &resolved_inputs, &mut tags) .into_diagnostic() .context("computing block tags")?; @@ -108,7 +81,9 @@ pub(crate) async fn import_hardano_into_archive( .map_err(|err| miette::miette!(format!("{err:?}"))) .context("writing archive block")?; - utxos.remove_block_inputs(view); + utxos + .remove_block_inputs(view) + .context("evicting block inputs")?; last_slot = Some(point.slot()); } diff --git a/src/bin/dolos/bootstrap/mithril/mod.rs b/src/bin/dolos/bootstrap/mithril/mod.rs index 1231c5fb..f43e3225 100644 --- a/src/bin/dolos/bootstrap/mithril/mod.rs +++ b/src/bin/dolos/bootstrap/mithril/mod.rs @@ -10,6 +10,7 @@ use dolos_core::config::RootConfig; mod archive; mod helpers; mod state; +mod utxo_cache; #[derive(Debug, clap::Args, Clone)] pub struct Args { @@ -110,19 +111,11 @@ pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miett let immutable_path = Path::new(&args.download_dir).join("immutable"); if run_state { - state::import_hardano_into_state(args, config, &immutable_path, feedback, args.chunk_size) - .await?; + state::import_hardano(args, config, &immutable_path, feedback, args.chunk_size).await?; } if run_archive { - archive::import_hardano_into_archive( - args, - config, - &immutable_path, - feedback, - args.chunk_size, - ) - .await?; + archive::import_hardano(args, config, &immutable_path, feedback, args.chunk_size).await?; } if !args.retain_snapshot { diff --git a/src/bin/dolos/bootstrap/mithril/state.rs b/src/bin/dolos/bootstrap/mithril/state.rs index 9daf2678..efac300e 100644 --- a/src/bin/dolos/bootstrap/mithril/state.rs +++ b/src/bin/dolos/bootstrap/mithril/state.rs @@ -12,7 +12,7 @@ use dolos_core::config::RootConfig; use super::helpers::define_starting_point; use super::Args; -pub(crate) async fn import_hardano_into_state( +pub(crate) async fn import_hardano( args: &Args, config: &RootConfig, immutable_path: &Path, diff --git a/src/bin/dolos/bootstrap/mithril/utxo_cache.rs b/src/bin/dolos/bootstrap/mithril/utxo_cache.rs new file mode 100644 index 00000000..5fdefe25 --- /dev/null +++ b/src/bin/dolos/bootstrap/mithril/utxo_cache.rs @@ -0,0 +1,105 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use dolos_redb3::redb::{backends::InMemoryBackend, Database, ReadableDatabase, TableDefinition}; +use miette::IntoDiagnostic; +use pallas::ledger::traverse::MultiEraBlock; + +use dolos::prelude::*; +use dolos_cardano::owned::OwnedMultiEraOutput; + +type UtxosKey = (&'static [u8; 32], u32); +type UtxosValue = (u16, &'static [u8]); + +const UTXOS_TABLE: TableDefinition<'static, UtxosKey, UtxosValue> = TableDefinition::new("utxos"); + +pub(super) struct UtxoCache { + db: Database, +} + +impl UtxoCache { + pub(super) fn in_memory() -> miette::Result { + let db = Database::builder() + .create_with_backend(InMemoryBackend::new()) + .into_diagnostic()?; + let wx = db.begin_write().into_diagnostic()?; + wx.open_table(UTXOS_TABLE).into_diagnostic()?; + wx.commit().into_diagnostic()?; + + Ok(Self { db }) + } + + pub(super) fn insert_block_outputs(&self, block: &MultiEraBlock) -> miette::Result<()> { + let wx = self.db.begin_write().into_diagnostic()?; + { + let mut table = wx.open_table(UTXOS_TABLE).into_diagnostic()?; + + for tx in block.txs() { + let tx_hash = tx.hash(); + + for (idx, output) in tx.produces() { + let txo_ref = TxoRef(tx_hash, idx as u32); + let eracbor: EraCbor = output.into(); + let key: (&[u8; 32], u32) = (&txo_ref.0, txo_ref.1); + table + .insert(key, (eracbor.0, eracbor.1.as_slice())) + .into_diagnostic()?; + } + } + } + + wx.commit().into_diagnostic()?; + + Ok(()) + } + + pub(super) fn resolve_block_inputs( + &self, + block: &MultiEraBlock, + ) -> miette::Result> { + let rx = self.db.begin_read().into_diagnostic()?; + let table = rx.open_table(UTXOS_TABLE).into_diagnostic()?; + let mut resolved_inputs = HashMap::new(); + + for tx in block.txs() { + for input in tx.consumes() { + let txo_ref: TxoRef = (&input).into(); + + if resolved_inputs.contains_key(&txo_ref) { + continue; + } + + let key: (&[u8; 32], u32) = (&txo_ref.0, txo_ref.1); + if let Some(body) = table.get(key).into_diagnostic()? { + let (era, cbor) = body.value(); + let eracbor = EraCbor(era, cbor.to_vec()); + let resolved = + OwnedMultiEraOutput::decode(Arc::new(eracbor)).into_diagnostic()?; + + resolved_inputs.insert(txo_ref, resolved); + } + } + } + + Ok(resolved_inputs) + } + + pub(super) fn remove_block_inputs(&self, block: &MultiEraBlock) -> miette::Result<()> { + let wx = self.db.begin_write().into_diagnostic()?; + { + let mut table = wx.open_table(UTXOS_TABLE).into_diagnostic()?; + + for tx in block.txs() { + for input in tx.consumes() { + let txo_ref: TxoRef = (&input).into(); + let key: (&[u8; 32], u32) = (&txo_ref.0, txo_ref.1); + table.remove(key).into_diagnostic()?; + } + } + } + + wx.commit().into_diagnostic()?; + + Ok(()) + } +} From 1dfb1b071ecefc323caec715b36873cac7960fab Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Wed, 21 Jan 2026 17:46:46 -0300 Subject: [PATCH 5/6] defer indexes --- crates/cardano/src/genesis/mod.rs | 4 +- crates/core/src/batch.rs | 4 +- crates/core/src/facade.rs | 16 +- crates/core/src/state.rs | 2 +- crates/redb3/src/state/mod.rs | 11 +- crates/redb3/src/state/utxoset.rs | 199 ++++++++++++++++------- crates/testing/src/toy_domain.rs | 6 +- src/bin/dolos/bootstrap/mithril/state.rs | 10 +- src/facade.rs | 2 +- 9 files changed, 181 insertions(+), 73 deletions(-) diff --git a/crates/cardano/src/genesis/mod.rs b/crates/cardano/src/genesis/mod.rs index 8da74557..cc83179c 100644 --- a/crates/cardano/src/genesis/mod.rs +++ b/crates/cardano/src/genesis/mod.rs @@ -118,10 +118,10 @@ pub fn bootstrap_utxos( let writer = state.start_writer()?; let delta = crate::utxoset::compute_origin_delta(genesis); - writer.apply_utxoset(&delta)?; + writer.apply_utxoset(&delta, true)?; let delta = crate::utxoset::build_custom_utxos_delta(config)?; - writer.apply_utxoset(&delta)?; + writer.apply_utxoset(&delta, true)?; writer.commit()?; diff --git a/crates/core/src/batch.rs b/crates/core/src/batch.rs index 78f7c185..ed93a043 100644 --- a/crates/core/src/batch.rs +++ b/crates/core/src/batch.rs @@ -283,7 +283,7 @@ impl WorkBatch { Ok(()) } - pub fn commit_state(&mut self, domain: &D) -> Result<(), DomainError> + pub fn commit_state(&mut self, domain: &D, update_indexes: bool) -> Result<(), DomainError> where D: Domain, { @@ -298,7 +298,7 @@ impl WorkBatch { // this into the entity system. for block in self.blocks.iter() { if let Some(utxo_delta) = &block.utxo_delta { - writer.apply_utxoset(utxo_delta)?; + writer.apply_utxoset(utxo_delta, update_indexes)?; } } diff --git a/crates/core/src/facade.rs b/crates/core/src/facade.rs index 6b10f6e1..06f0dbc9 100644 --- a/crates/core/src/facade.rs +++ b/crates/core/src/facade.rs @@ -29,7 +29,7 @@ fn execute_batch( batch.apply_entities()?; - batch.commit_state(domain)?; + batch.commit_state(domain, true)?; batch.commit_archive(domain)?; @@ -41,6 +41,7 @@ fn execute_batch_state_only( domain: &D, batch: &mut WorkBatch, with_wal: bool, + update_indexes: bool, ) -> Result { batch.load_utxos(domain)?; @@ -57,7 +58,7 @@ fn execute_batch_state_only( batch.apply_entities()?; - batch.commit_state(domain)?; + batch.commit_state(domain, update_indexes)?; Ok(batch.last_slot()) } @@ -115,6 +116,7 @@ fn execute_work_state_only( domain: &D, work: &mut WorkUnit, live: bool, + update_indexes: bool, ) -> Result<(), DomainError> { match work { WorkUnit::Genesis => { @@ -131,7 +133,7 @@ fn execute_work_state_only( chain.apply_rupd::(domain.state(), domain.archive(), domain.genesis(), *slot)?; } WorkUnit::Blocks(batch) => { - execute_batch_state_only(chain, domain, batch, live)?; + execute_batch_state_only(chain, domain, batch, live, update_indexes)?; } WorkUnit::ForcedStop => { return Err(DomainError::StopEpochReached); @@ -161,9 +163,10 @@ async fn drain_pending_work_state_only( chain: &mut D::Chain, domain: &D, live: bool, + update_indexes: bool, ) -> Result<(), DomainError> { while let Some(mut work) = chain.pop_work() { - execute_work_state_only(chain, domain, &mut work, live)?; + execute_work_state_only(chain, domain, &mut work, live, update_indexes)?; } Ok(()) @@ -193,20 +196,21 @@ pub async fn import_blocks( pub async fn import_blocks_state_only( domain: &D, mut raw: Vec, + update_indexes: bool, ) -> Result { let mut last = 0; let mut chain = domain.write_chain().await; for block in raw.drain(..) { if !chain.can_receive_block() { - drain_pending_work_state_only(&mut *chain, domain, false).await?; + drain_pending_work_state_only(&mut *chain, domain, false, update_indexes).await?; } last = chain.receive_block(block)?; } // one last drain to ensure we're up to date - drain_pending_work_state_only(&mut *chain, domain, false).await?; + drain_pending_work_state_only(&mut *chain, domain, false, update_indexes).await?; Ok(last) } diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index cb2cc083..d3cf7033 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -276,7 +276,7 @@ pub trait StateWriter: Sized + Send + Sync { fn delete_entity(&self, ns: Namespace, key: &EntityKey) -> Result<(), StateError>; - fn apply_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>; + fn apply_utxoset(&self, delta: &UtxoSetDelta, update_indexes: bool) -> Result<(), StateError>; #[allow(clippy::double_must_use)] #[must_use] diff --git a/crates/redb3/src/state/mod.rs b/crates/redb3/src/state/mod.rs index cd5be774..c26af2f6 100644 --- a/crates/redb3/src/state/mod.rs +++ b/crates/redb3/src/state/mod.rs @@ -217,9 +217,16 @@ impl dolos_core::StateWriter for StateWriter { Ok(()) } - fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> { + fn apply_utxoset( + &self, + delta: &dolos_core::UtxoSetDelta, + update_indexes: bool, + ) -> Result<(), StateError> { utxoset::UtxosTable::apply(&self.wx, delta)?; - utxoset::FilterIndexes::apply(&self.wx, delta)?; + + if update_indexes { + utxoset::FilterIndexes::apply(&self.wx, delta)?; + } Ok(()) } diff --git a/crates/redb3/src/state/utxoset.rs b/crates/redb3/src/state/utxoset.rs index ad324c55..a53d3a8e 100644 --- a/crates/redb3/src/state/utxoset.rs +++ b/crates/redb3/src/state/utxoset.rs @@ -4,8 +4,8 @@ use pallas::{ ledger::{addresses::ShelleyDelegationPart, traverse::MultiEraOutput}, }; use redb::{ - MultimapTableDefinition, Range, ReadTransaction, ReadableDatabase, ReadableTable as _, - ReadableTableMetadata as _, TableDefinition, TableStats, WriteTransaction, + MultimapTable, MultimapTableDefinition, Range, ReadTransaction, ReadableDatabase, + ReadableTable as _, ReadableTableMetadata as _, TableDefinition, TableStats, WriteTransaction, }; use std::{ collections::{HashMap, HashSet}, @@ -344,6 +344,86 @@ impl FilterIndexes { } } + fn index_output( + address_table: &mut MultimapTable<&[u8], UtxosKey>, + payment_table: &mut MultimapTable<&[u8], UtxosKey>, + stake_table: &mut MultimapTable<&[u8], UtxosKey>, + policy_table: &mut MultimapTable<&[u8], UtxosKey>, + asset_table: &mut MultimapTable<&[u8], UtxosKey>, + utxo: (&[u8; 32], u32), + body: &MultiEraOutput, + ) -> Result<(), Error> { + let SplitAddressResult(addr, pay, stake) = Self::split_address(body)?; + + if let Some(k) = addr { + address_table.insert(k.as_slice(), utxo)?; + } + + if let Some(k) = pay { + payment_table.insert(k.as_slice(), utxo)?; + } + + if let Some(k) = stake { + stake_table.insert(k.as_slice(), utxo)?; + } + + let value = body.value(); + let assets = value.assets(); + + for batch in assets { + policy_table.insert(batch.policy().as_slice(), utxo)?; + + for asset in batch.assets() { + let mut subject = asset.policy().to_vec(); + subject.extend(asset.name()); + + asset_table.insert(subject.as_slice(), utxo)?; + } + } + + Ok(()) + } + + fn unindex_output( + address_table: &mut MultimapTable<&[u8], UtxosKey>, + payment_table: &mut MultimapTable<&[u8], UtxosKey>, + stake_table: &mut MultimapTable<&[u8], UtxosKey>, + policy_table: &mut MultimapTable<&[u8], UtxosKey>, + asset_table: &mut MultimapTable<&[u8], UtxosKey>, + utxo: (&[u8; 32], u32), + body: &MultiEraOutput, + ) -> Result<(), Error> { + let SplitAddressResult(addr, pay, stake) = Self::split_address(body)?; + + if let Some(k) = addr { + address_table.remove(k.as_slice(), utxo)?; + } + + if let Some(k) = pay { + payment_table.remove(k.as_slice(), utxo)?; + } + + if let Some(k) = stake { + stake_table.remove(k.as_slice(), utxo)?; + } + + let value = body.value(); + let assets = value.assets(); + + for batch in assets { + policy_table.remove(batch.policy().as_slice(), utxo)?; + + for asset in batch.assets() { + let mut subject = asset.policy().to_vec(); + subject.extend(asset.name()); + + asset_table.remove(subject.as_slice(), utxo)?; + } + } + + Ok(()) + } + pub fn apply(wx: &WriteTransaction, delta: &UtxoSetDelta) -> Result<(), Error> { let mut address_table = wx.open_multimap_table(Self::BY_ADDRESS)?; let mut payment_table = wx.open_multimap_table(Self::BY_PAYMENT)?; @@ -361,33 +441,16 @@ impl FilterIndexes { // TODO: decoding here is very inefficient let body = MultiEraOutput::try_from(body.as_ref()).unwrap(); - let SplitAddressResult(addr, pay, stake) = Self::split_address(&body)?; - - if let Some(k) = addr { - address_table.insert(k.as_slice(), v)?; - } - - if let Some(k) = pay { - payment_table.insert(k.as_slice(), v)?; - } - - if let Some(k) = stake { - stake_table.insert(k.as_slice(), v)?; - } - - let value = body.value(); - let assets = value.assets(); - for batch in assets { - policy_table.insert(batch.policy().as_slice(), v)?; - - for asset in batch.assets() { - let mut subject = asset.policy().to_vec(); - subject.extend(asset.name()); - - asset_table.insert(subject.as_slice(), v)?; - } - } + Self::index_output( + &mut address_table, + &mut payment_table, + &mut stake_table, + &mut policy_table, + &mut asset_table, + v, + &body, + )?; } let forgettable = delta.consumed_utxo.iter().chain(delta.undone_utxo.iter()); @@ -398,33 +461,15 @@ impl FilterIndexes { // TODO: decoding here is very inefficient let body = MultiEraOutput::try_from(body.as_ref()).unwrap(); - let SplitAddressResult(addr, pay, stake) = Self::split_address(&body)?; - - if let Some(k) = addr { - address_table.remove(k.as_slice(), v)?; - } - - if let Some(k) = pay { - payment_table.remove(k.as_slice(), v)?; - } - - if let Some(k) = stake { - stake_table.remove(k.as_slice(), v)?; - } - - let value = body.value(); - let assets = value.assets(); - - for batch in assets { - policy_table.remove(batch.policy().as_slice(), v)?; - - for asset in batch.assets() { - let mut subject = asset.policy().to_vec(); - subject.extend(asset.name()); - - asset_table.remove(subject.as_slice(), v)?; - } - } + Self::unindex_output( + &mut address_table, + &mut payment_table, + &mut stake_table, + &mut policy_table, + &mut asset_table, + v, + &body, + )?; } Ok(()) @@ -503,6 +548,48 @@ impl StateStore { Ok(HashMap::from_iter(all_tables)) } + pub fn rebuild_utxo_indexes(&self) -> Result<(), Error> { + let rx = self.db().begin_read()?; + let wx = self.db().begin_write()?; + + { + let _ = wx.delete_multimap_table(FilterIndexes::BY_ADDRESS)?; + let _ = wx.delete_multimap_table(FilterIndexes::BY_PAYMENT)?; + let _ = wx.delete_multimap_table(FilterIndexes::BY_STAKE)?; + let _ = wx.delete_multimap_table(FilterIndexes::BY_POLICY)?; + let _ = wx.delete_multimap_table(FilterIndexes::BY_ASSET)?; + + let mut address_table = wx.open_multimap_table(FilterIndexes::BY_ADDRESS)?; + let mut payment_table = wx.open_multimap_table(FilterIndexes::BY_PAYMENT)?; + let mut stake_table = wx.open_multimap_table(FilterIndexes::BY_STAKE)?; + let mut policy_table = wx.open_multimap_table(FilterIndexes::BY_POLICY)?; + let mut asset_table = wx.open_multimap_table(FilterIndexes::BY_ASSET)?; + + let utxos = UtxosTable::iter(&rx)?; + + for entry in utxos { + let (utxo, body) = entry.map_err(Error::from)?; + let output = + MultiEraOutput::try_from(&body).map_err(|_| Error::InvalidOperation)?; + let key: (&[u8; 32], u32) = (&utxo.0, utxo.1); + + FilterIndexes::index_output( + &mut address_table, + &mut payment_table, + &mut stake_table, + &mut policy_table, + &mut asset_table, + key, + &output, + )?; + } + } + + wx.commit()?; + + Ok(()) + } + pub fn get_datum(&self, datum_hash: &Hash<32>) -> Result>, Error> { let rx = self.db().begin_read()?; DatumsTable::get(&rx, datum_hash) @@ -530,7 +617,7 @@ mod tests { ($store:expr, $deltas:expr) => { let writer = $store.start_writer().unwrap(); for delta in $deltas.iter() { - writer.apply_utxoset(&delta).unwrap(); + writer.apply_utxoset(&delta, true).unwrap(); } writer.commit().unwrap(); }; diff --git a/crates/testing/src/toy_domain.rs b/crates/testing/src/toy_domain.rs index 6e5686b4..a7ebba9c 100644 --- a/crates/testing/src/toy_domain.rs +++ b/crates/testing/src/toy_domain.rs @@ -17,7 +17,8 @@ pub fn seed_random_memory_store(utxo_generator: impl UtxoGenerator) -> impl Stat let delta = make_custom_utxo_delta(everyone, utxos_per_address, utxo_generator); let writer = store.start_writer().unwrap(); - writer.apply_utxoset(&delta).unwrap(); + writer.apply_utxoset(&delta, true).unwrap(); + writer.commit().unwrap(); store @@ -127,7 +128,8 @@ impl ToyDomain { if let Some(delta) = initial_delta { let writer = domain.state.start_writer().unwrap(); - writer.apply_utxoset(&delta).unwrap(); + writer.apply_utxoset(&delta, true).unwrap(); + writer.commit().unwrap(); } diff --git a/src/bin/dolos/bootstrap/mithril/state.rs b/src/bin/dolos/bootstrap/mithril/state.rs index efac300e..7cff9734 100644 --- a/src/bin/dolos/bootstrap/mithril/state.rs +++ b/src/bin/dolos/bootstrap/mithril/state.rs @@ -47,13 +47,21 @@ pub(crate) async fn import_hardano( // around throughout the pipeline let batch: Vec<_> = batch.into_iter().map(Arc::new).collect(); - let last = dolos_core::facade::import_blocks_state_only(&domain, batch) + let last = dolos_core::facade::import_blocks_state_only(&domain, batch, false) .await .map_err(|e| miette::miette!(e.to_string()))?; progress.set_position(last); } + import_result?; + + domain + .state() + .rebuild_utxo_indexes() + .map_err(|err| miette::miette!(format!("{err:?}"))) + .context("rebuilding state indexes")?; + progress.abandon_with_message("immutable db state import complete"); Ok(()) diff --git a/src/facade.rs b/src/facade.rs index a4638e19..0d093c90 100644 --- a/src/facade.rs +++ b/src/facade.rs @@ -61,7 +61,7 @@ where let utxo_undo = dolos_cardano::utxoset::compute_undo_delta(blockd, &inputs) .map_err(dolos_core::ChainError::from)?; - writer.apply_utxoset(&utxo_undo)?; + writer.apply_utxoset(&utxo_undo, true)?; // TODO: we should differ notifications until the we commit the writers self.notify_tip(TipEvent::Undo(point.clone(), block)); From 875b1f1a6957215ad13f54deb7ec92d6a566e97d Mon Sep 17 00:00:00 2001 From: Santiago Date: Wed, 21 Jan 2026 23:50:17 -0300 Subject: [PATCH 6/6] fix compile typo --- src/bin/dolos/bootstrap/mithril/state.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/bin/dolos/bootstrap/mithril/state.rs b/src/bin/dolos/bootstrap/mithril/state.rs index 7cff9734..be596591 100644 --- a/src/bin/dolos/bootstrap/mithril/state.rs +++ b/src/bin/dolos/bootstrap/mithril/state.rs @@ -54,8 +54,6 @@ pub(crate) async fn import_hardano( progress.set_position(last); } - import_result?; - domain .state() .rebuild_utxo_indexes()