From 2f076cb46531d38b4859996b7f64114f454d35ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Mon, 8 Dec 2025 14:19:26 +0200 Subject: [PATCH] feat: better visibility around exiting using log, reason and exitCode --- src/bin/dolos/common.rs | 35 +++++++++++++++++++++++++----- src/bin/dolos/daemon.rs | 16 ++++++++++++-- src/bin/dolos/data/summary.rs | 2 +- src/sync/apply.rs | 41 ++++++++++++++++++++++++++++++----- src/sync/mod.rs | 3 ++- src/sync/pull.rs | 31 +++++++++++++++++++++----- 6 files changed, 108 insertions(+), 20 deletions(-) diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index fde62319..f601bf1a 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -9,6 +9,7 @@ use tracing_subscriber::{filter::Targets, prelude::*}; use dolos::adapters::{DomainAdapter, WalAdapter}; use dolos::core::Genesis; use dolos::prelude::*; +use dolos::sync::apply::{is_quota_reached, is_stop_epoch_reached}; pub struct Stores { pub wal: WalAdapter, @@ -261,27 +262,51 @@ pub fn hook_exit_token() -> CancellationToken { cancel } -pub async fn run_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationToken) { +#[derive(Debug, PartialEq)] +pub enum PipelineStopReason { + /// Pipeline stopped due to a signal (SIGINT, SIGTERM) + Signal, + /// Pipeline stopped due to reaching sync quota limits + QuotaReached, + /// Pipeline stopped due to reaching configured stop epoch + StopEpochReached, + /// Pipeline stopped for other reasons + Other, +} + +pub async fn run_pipeline( + pipeline: gasket::daemon::Daemon, + exit: CancellationToken, +) -> PipelineStopReason { + let mut stop_reason = PipelineStopReason::Other; + loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => { if pipeline.should_stop() { - debug!("pipeline should stop"); - + info!("pipeline is stopping gracefully - one or more stages have completed their work"); + if is_stop_epoch_reached() { + stop_reason = PipelineStopReason::StopEpochReached; + } else if is_quota_reached() { + stop_reason = PipelineStopReason::QuotaReached; + } // trigger cancel so that stages stop early exit.cancel(); break; } } _ = exit.cancelled() => { - debug!("exit requested"); + info!("exit requested, shutting down pipeline"); + stop_reason = PipelineStopReason::Signal; break; } } } - debug!("shutting down pipeline"); + info!("shutting down pipeline"); pipeline.teardown(); + + stop_reason } pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> { diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 69b315f0..cde8526e 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -42,9 +42,21 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> { } } - sync.await.unwrap(); + let stop_reason = sync.await.unwrap(); warn!("shutdown complete"); - Ok(()) + // Return appropriate exit code based on why the pipeline stopped + let exit_code = match stop_reason { + crate::common::PipelineStopReason::Signal => 0, + crate::common::PipelineStopReason::QuotaReached => 1, + crate::common::PipelineStopReason::StopEpochReached => 0, + crate::common::PipelineStopReason::Other => 2, + }; + + if exit_code == 0 { + Ok(()) + } else { + std::process::exit(exit_code); + } } diff --git a/src/bin/dolos/data/summary.rs b/src/bin/dolos/data/summary.rs index 520575f6..38040bb1 100644 --- a/src/bin/dolos/data/summary.rs +++ b/src/bin/dolos/data/summary.rs @@ -1,6 +1,6 @@ -use dolos_core::config::RootConfig; use dolos::cli::{ArchiveSummary, DataSummary, StateSummary, WalSummary}; use dolos::prelude::*; +use dolos_core::config::RootConfig; #[derive(Debug, clap::Args)] pub struct Args {} diff --git a/src/sync/apply.rs b/src/sync/apply.rs index fa731ee0..d55c56f2 100644 --- a/src/sync/apply.rs +++ b/src/sync/apply.rs @@ -1,8 +1,29 @@ use gasket::{framework::*, messaging::Message}; +use std::sync::atomic::{AtomicBool, Ordering}; use tracing::debug; use crate::{adapters::DomainAdapter, facade::DomainExt as _, prelude::*}; +type WorkerError = gasket::framework::WorkerError; + +static STOP_EPOCH_REACHED: AtomicBool = AtomicBool::new(false); +static QUOTA_REACHED: AtomicBool = AtomicBool::new(false); + +pub fn set_stop_epoch_reached(value: bool) { + STOP_EPOCH_REACHED.store(value, Ordering::SeqCst); +} +pub fn is_stop_epoch_reached() -> bool { + STOP_EPOCH_REACHED.load(Ordering::SeqCst) +} + +pub fn set_quota_reached(value: bool) { + QUOTA_REACHED.store(value, Ordering::SeqCst); +} + +pub fn is_quota_reached() -> bool { + QUOTA_REACHED.load(Ordering::SeqCst) +} + pub type UpstreamPort = gasket::messaging::InputPort; pub enum WorkUnit { @@ -51,12 +72,20 @@ impl Stage { async fn on_roll_forward(&self, block: RawBlock) -> Result<(), WorkerError> { debug!("handling roll forward"); - - dolos_core::facade::roll_forward(&self.domain, block) - .await - .or_panic()?; - - Ok(()) + match dolos_core::facade::roll_forward(&self.domain, block).await { + Ok(_) => Ok(()), + Err(dolos_core::DomainError::StopEpochReached) => { + // indicate stop epoch was reached + set_stop_epoch_reached(true); + // Return a special error to indicate stop epoch was reached + Err(WorkerError::Panic) + } + Err(e) => { + // Convert other domain errors to worker errors + tracing::error!("Domain error: {:?}", e); + Err(WorkerError::Panic) + } + } } fn on_rollback(&self, point: &ChainPoint) -> Result<(), WorkerError> { diff --git a/src/sync/mod.rs b/src/sync/mod.rs index efc74118..4a54f6f0 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -31,7 +31,8 @@ fn define_gasket_policy(config: &Option) -> gasket::runtime::Policy }; gasket::runtime::Policy { - // TODO: we skip checking timeouts to avoid stalling the pipeline on slow work units. The long-term solution is to scope work units to fit within a particular quota. + // TODO: we skip checking timeouts to avoid stalling the pipeline on slow work units. The + // long-term solution is to scope work units to fit within a particular quota. tick_timeout: None, bootstrap_retry: retries.clone(), work_retry: retries.clone(), diff --git a/src/sync/pull.rs b/src/sync/pull.rs index 41f16433..e28f06fa 100644 --- a/src/sync/pull.rs +++ b/src/sync/pull.rs @@ -11,6 +11,7 @@ use pallas::network::miniprotocols::chainsync::{ use pallas::network::miniprotocols::Point; use tracing::{debug, info, warn}; +use super::apply::set_quota_reached; use crate::adapters::WalAdapter; use crate::prelude::*; @@ -40,17 +41,19 @@ pub enum PullQuota { WaitingTip, Unlimited, BlockQuota(u64), - Reached, + ReachedTip, + ReachedBlockQuota, } impl PullQuota { fn should_quit(&self) -> bool { - matches!(self, Self::Reached) + matches!(self, Self::ReachedTip | Self::ReachedBlockQuota) } fn on_tip(&mut self) { if let Self::WaitingTip = self { - *self = Self::Reached; + info!("chain tip reached, sync quota will be satisfied"); + *self = Self::ReachedTip; } } @@ -59,8 +62,13 @@ impl PullQuota { let new = x.saturating_sub(count); if new == 0 { - *self = Self::Reached; + info!( + "block quota of {} blocks has been consumed, sync quota will be satisfied", + x + ); + *self = Self::ReachedBlockQuota; } else { + debug!("block quota: {} blocks remaining", new); *self = Self::BlockQuota(new); } } @@ -165,7 +173,20 @@ impl gasket::framework::Worker for Worker { async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { if stage.quota.should_quit() { - warn!("quota reached, stopping sync"); + match stage.quota { + PullQuota::ReachedTip => { + info!("sync quota reached: chain tip has been reached, stopping sync as configured"); + set_quota_reached(true); + } + PullQuota::ReachedBlockQuota => { + info!("sync quota reached: maximum number of blocks has been processed, stopping sync as configured"); + set_quota_reached(true); + } + _ => { + warn!("quota reached, stopping sync"); + set_quota_reached(true); + } + } return Ok(WorkSchedule::Done); }