Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .ongoing/phased-mithril-import.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Phased Mithril Import Plan

## Goal
Split Mithril bootstrap into two passes (state, archive), skip WAL during bootstrap, and make archive import fast and independent of state.

## Work Completed
- Split Mithril bootstrap into modules:
- `src/bin/dolos/bootstrap/mithril/mod.rs` (facade: CLI args + run)
- `src/bin/dolos/bootstrap/mithril/helpers.rs` (snapshot download + starting point helpers)
- `src/bin/dolos/bootstrap/mithril/state.rs` (state pass)
- `src/bin/dolos/bootstrap/mithril/archive.rs` (archive pass)
- Added state-only import in core:
- `crates/core/src/facade.rs` now exposes `import_blocks_state_only` with state-only batch execution helpers (no archive commit).
- Updated slot tag collection to accept resolved inputs:
- `crates/cardano/src/roll/txs.rs` signature is now `collect_slot_tags_from_block(block, resolved_inputs, tags)`.
- Uses resolved inputs to index input-derived tags (addresses, assets, datums).
- Archive pass uses an in-memory UTxO cache (no disk cache):
- Insert outputs into cache (decoded once).
- Build tags using resolved inputs from cache (skip missing inputs).
- Apply to archive.
- Remove consumed inputs from cache.
- Archive pass no longer resolves inputs from archive on-demand.
- State pass still uses `import_blocks_state_only`.
- CLI flags `--state-only` and `--archive-only` remain in the facade.

## Key Decisions
- WAL is skipped entirely during bootstrap.
- Archive pass uses only tx logs/tags with in-memory UTxO cache.
- Missing inputs during archive pass are skipped (no error).
- No cache size limits yet (memory-heavy acceptable for now).

## Files Touched
- `src/bin/dolos/bootstrap/mithril/mod.rs`
- `src/bin/dolos/bootstrap/mithril/helpers.rs`
- `src/bin/dolos/bootstrap/mithril/state.rs`
- `src/bin/dolos/bootstrap/mithril/archive.rs`
- `crates/core/src/facade.rs`
- `crates/cardano/src/roll/txs.rs`

## Notes
- Compile checks ran successfully after refactor.
- UTxO cache is archive-only and not reused in helpers.
- `helpers.rs` is the preferred naming over `common.rs`.
4 changes: 2 additions & 2 deletions crates/cardano/src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ pub fn bootstrap_utxos<D: Domain>(
let writer = state.start_writer()?;

let delta = crate::utxoset::compute_origin_delta(genesis);
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, true)?;

let delta = crate::utxoset::build_custom_utxos_delta(config)?;
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, true)?;

writer.commit()?;

Expand Down
72 changes: 72 additions & 0 deletions crates/cardano/src/roll/txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,78 @@ fn unpack_cert(tags: &mut SlotTags, cert: &MultiEraCert) {
}
}

// TODO: this public method breaks all abstrations but it's an easy way to generate archive indexes in an isolated way as needed for the Mithril import process.
pub fn collect_slot_tags_from_block(
block: &MultiEraBlock,
resolved_inputs: &HashMap<TxoRef, OwnedMultiEraOutput>,
tags: &mut SlotTags,
) -> Result<(), ChainError> {
_hack_collect_slot_tags_from_block(block, resolved_inputs, tags)
}

fn _hack_collect_slot_tags_from_block(
block: &MultiEraBlock,
resolved_inputs: &HashMap<TxoRef, OwnedMultiEraOutput>,
tags: &mut SlotTags,
) -> Result<(), ChainError> {
tags.number = Some(block.number());

for tx in block.txs() {
tags.tx_hashes.push(tx.hash().to_vec());

for (key, _) in tx.metadata().collect::<Vec<_>>() {
tags.metadata.push(key);
}

for input in tx.consumes() {
let txoref: TxoRef = (&input).into();
tags.spent_txo.push(txoref.clone().into());

if let Some(resolved) = resolved_inputs.get(&txoref) {
resolved.with_dependent(|_, resolved| {
if let Ok(address) = resolved.address() {
unpack_address(tags, &address);
}

unpack_assets(tags, &resolved.value());

if let Some(datum) = resolved.datum() {
unpack_datum(tags, &datum);
}

Result::<_, ChainError>::Ok(())
})?;
}
}

for (_, output) in tx.produces() {
if let Ok(address) = output.address() {
unpack_address(tags, &address);
}

unpack_assets(tags, &output.value());

if let Some(datum) = output.datum() {
unpack_datum(tags, &datum);
}
}

for datum in tx.plutus_data() {
tags.datums.push(datum.original_hash().to_vec());
}

for cert in tx.certs() {
unpack_cert(tags, &cert);
}

for redeemer in tx.redeemers() {
tags.datums.push(redeemer.data().compute_hash().to_vec());
}
}

Ok(())
}

impl BlockVisitor for TxLogVisitor {
fn visit_root(
&mut self,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<C: ChainLogic> WorkBatch<C> {
Ok(())
}

pub fn commit_state<D>(&mut self, domain: &D) -> Result<(), DomainError>
pub fn commit_state<D>(&mut self, domain: &D, update_indexes: bool) -> Result<(), DomainError>
where
D: Domain<Chain = C>,
{
Expand All @@ -298,7 +298,7 @@ impl<C: ChainLogic> WorkBatch<C> {
// this into the entity system.
for block in self.blocks.iter() {
if let Some(utxo_delta) = &block.utxo_delta {
writer.apply_utxoset(utxo_delta)?;
writer.apply_utxoset(utxo_delta, update_indexes)?;
}
}

Expand Down
100 changes: 99 additions & 1 deletion crates/core/src/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,40 @@ fn execute_batch<D: Domain>(

batch.apply_entities()?;

batch.commit_state(domain)?;
batch.commit_state(domain, true)?;

batch.commit_archive(domain)?;

Ok(batch.last_slot())
}

fn execute_batch_state_only<D: Domain>(
chain: &D::Chain,
domain: &D,
batch: &mut WorkBatch<D::Chain>,
with_wal: bool,
update_indexes: bool,
) -> Result<BlockSlot, DomainError> {
batch.load_utxos(domain)?;

batch.decode_utxos(chain)?;

// Chain-specific logic
chain.compute_delta::<D>(domain.state(), domain.genesis(), batch)?;

if with_wal {
batch.commit_wal(domain)?;
}

batch.load_entities(domain)?;

batch.apply_entities()?;

batch.commit_state(domain, update_indexes)?;

Ok(batch.last_slot())
}

fn notify_work<D: Domain>(domain: &D, work: &WorkUnit<D::Chain>) {
let WorkUnit::Blocks(batch) = work else {
return;
Expand Down Expand Up @@ -84,6 +111,42 @@ fn execute_work<D: Domain>(
Ok(())
}

fn execute_work_state_only<D: Domain>(
chain: &mut D::Chain,
domain: &D,
work: &mut WorkUnit<D::Chain>,
live: bool,
update_indexes: bool,
) -> Result<(), DomainError> {
match work {
WorkUnit::Genesis => {
chain.apply_genesis::<D>(domain.state(), domain.genesis())?;
domain.wal().reset_to(&ChainPoint::Origin)?;
}
WorkUnit::EWrap(slot) => {
chain.apply_ewrap::<D>(domain.state(), domain.archive(), domain.genesis(), *slot)?;
}
WorkUnit::EStart(slot) => {
chain.apply_estart::<D>(domain.state(), domain.archive(), domain.genesis(), *slot)?;
}
WorkUnit::Rupd(slot) => {
chain.apply_rupd::<D>(domain.state(), domain.archive(), domain.genesis(), *slot)?;
}
WorkUnit::Blocks(batch) => {
execute_batch_state_only(chain, domain, batch, live, update_indexes)?;
}
WorkUnit::ForcedStop => {
return Err(DomainError::StopEpochReached);
}
};

if live {
notify_work(domain, work);
}

Ok(())
}

async fn drain_pending_work<D: Domain>(
chain: &mut D::Chain,
domain: &D,
Expand All @@ -96,6 +159,19 @@ async fn drain_pending_work<D: Domain>(
Ok(())
}

async fn drain_pending_work_state_only<D: Domain>(
chain: &mut D::Chain,
domain: &D,
live: bool,
update_indexes: bool,
) -> Result<(), DomainError> {
while let Some(mut work) = chain.pop_work() {
execute_work_state_only(chain, domain, &mut work, live, update_indexes)?;
}

Ok(())
}

pub async fn import_blocks<D: Domain>(
domain: &D,
mut raw: Vec<RawBlock>,
Expand All @@ -117,6 +193,28 @@ pub async fn import_blocks<D: Domain>(
Ok(last)
}

pub async fn import_blocks_state_only<D: Domain>(
domain: &D,
mut raw: Vec<RawBlock>,
update_indexes: bool,
) -> Result<BlockSlot, DomainError> {
let mut last = 0;
let mut chain = domain.write_chain().await;

for block in raw.drain(..) {
if !chain.can_receive_block() {
drain_pending_work_state_only(&mut *chain, domain, false, update_indexes).await?;
}

last = chain.receive_block(block)?;
}

// one last drain to ensure we're up to date
drain_pending_work_state_only(&mut *chain, domain, false, update_indexes).await?;

Ok(last)
}

pub async fn roll_forward<D: Domain>(
domain: &D,
block: RawBlock,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ pub trait StateWriter: Sized + Send + Sync {

fn delete_entity(&self, ns: Namespace, key: &EntityKey) -> Result<(), StateError>;

fn apply_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>;
fn apply_utxoset(&self, delta: &UtxoSetDelta, update_indexes: bool) -> Result<(), StateError>;

#[allow(clippy::double_must_use)]
#[must_use]
Expand Down
11 changes: 9 additions & 2 deletions crates/redb3/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,16 @@ impl dolos_core::StateWriter for StateWriter {
Ok(())
}

fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> {
fn apply_utxoset(
&self,
delta: &dolos_core::UtxoSetDelta,
update_indexes: bool,
) -> Result<(), StateError> {
utxoset::UtxosTable::apply(&self.wx, delta)?;
utxoset::FilterIndexes::apply(&self.wx, delta)?;

if update_indexes {
utxoset::FilterIndexes::apply(&self.wx, delta)?;
}

Ok(())
}
Expand Down
Loading
Loading