Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/bin/dolos/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tracing_subscriber::{filter::Targets, prelude::*};
use dolos::adapters::{DomainAdapter, WalAdapter};
use dolos::core::Genesis;
use dolos::prelude::*;
use dolos::sync::apply::{is_quota_reached, is_stop_epoch_reached};

pub struct Stores {
pub wal: WalAdapter,
Expand Down Expand Up @@ -261,27 +262,51 @@ pub fn hook_exit_token() -> CancellationToken {
cancel
}

pub async fn run_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationToken) {
#[derive(Debug, PartialEq)]
pub enum PipelineStopReason {
/// Pipeline stopped due to a signal (SIGINT, SIGTERM)
Signal,
/// Pipeline stopped due to reaching sync quota limits
QuotaReached,
/// Pipeline stopped due to reaching configured stop epoch
StopEpochReached,
/// Pipeline stopped for other reasons
Other,
}

pub async fn run_pipeline(
pipeline: gasket::daemon::Daemon,
exit: CancellationToken,
) -> PipelineStopReason {
let mut stop_reason = PipelineStopReason::Other;

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {
if pipeline.should_stop() {
debug!("pipeline should stop");

info!("pipeline is stopping gracefully - one or more stages have completed their work");
if is_stop_epoch_reached() {
stop_reason = PipelineStopReason::StopEpochReached;
} else if is_quota_reached() {
stop_reason = PipelineStopReason::QuotaReached;
}
// trigger cancel so that stages stop early
exit.cancel();
break;
}
}
_ = exit.cancelled() => {
debug!("exit requested");
info!("exit requested, shutting down pipeline");
stop_reason = PipelineStopReason::Signal;
break;
}
}
}

debug!("shutting down pipeline");
info!("shutting down pipeline");
pipeline.teardown();

stop_reason
}

pub fn cleanup_data(config: &RootConfig) -> Result<(), std::io::Error> {
Expand Down
16 changes: 14 additions & 2 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,21 @@ pub async fn run(config: RootConfig, _args: &Args) -> miette::Result<()> {
}
}

sync.await.unwrap();
let stop_reason = sync.await.unwrap();

warn!("shutdown complete");

Ok(())
// Return appropriate exit code based on why the pipeline stopped
let exit_code = match stop_reason {
crate::common::PipelineStopReason::Signal => 0,
crate::common::PipelineStopReason::QuotaReached => 1,
crate::common::PipelineStopReason::StopEpochReached => 0,
crate::common::PipelineStopReason::Other => 2,
};

if exit_code == 0 {
Ok(())
} else {
std::process::exit(exit_code);
}
}
2 changes: 1 addition & 1 deletion src/bin/dolos/data/summary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dolos_core::config::RootConfig;
use dolos::cli::{ArchiveSummary, DataSummary, StateSummary, WalSummary};
use dolos::prelude::*;
use dolos_core::config::RootConfig;

#[derive(Debug, clap::Args)]
pub struct Args {}
Expand Down
41 changes: 35 additions & 6 deletions src/sync/apply.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
use gasket::{framework::*, messaging::Message};
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::debug;

use crate::{adapters::DomainAdapter, facade::DomainExt as _, prelude::*};

type WorkerError = gasket::framework::WorkerError;

static STOP_EPOCH_REACHED: AtomicBool = AtomicBool::new(false);
static QUOTA_REACHED: AtomicBool = AtomicBool::new(false);

pub fn set_stop_epoch_reached(value: bool) {
STOP_EPOCH_REACHED.store(value, Ordering::SeqCst);
}
pub fn is_stop_epoch_reached() -> bool {
STOP_EPOCH_REACHED.load(Ordering::SeqCst)
}

pub fn set_quota_reached(value: bool) {
QUOTA_REACHED.store(value, Ordering::SeqCst);
}

pub fn is_quota_reached() -> bool {
QUOTA_REACHED.load(Ordering::SeqCst)
}

pub type UpstreamPort = gasket::messaging::InputPort<PullEvent>;

pub enum WorkUnit {
Expand Down Expand Up @@ -51,12 +72,20 @@ impl Stage {

async fn on_roll_forward(&self, block: RawBlock) -> Result<(), WorkerError> {
debug!("handling roll forward");

dolos_core::facade::roll_forward(&self.domain, block)
.await
.or_panic()?;

Ok(())
match dolos_core::facade::roll_forward(&self.domain, block).await {
Ok(_) => Ok(()),
Err(dolos_core::DomainError::StopEpochReached) => {
// indicate stop epoch was reached
set_stop_epoch_reached(true);
// Return a special error to indicate stop epoch was reached
Err(WorkerError::Panic)
}
Err(e) => {
// Convert other domain errors to worker errors
tracing::error!("Domain error: {:?}", e);
Err(WorkerError::Panic)
}
}
}

fn on_rollback(&self, point: &ChainPoint) -> Result<(), WorkerError> {
Expand Down
3 changes: 2 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ fn define_gasket_policy(config: &Option<RetryConfig>) -> gasket::runtime::Policy
};

gasket::runtime::Policy {
// TODO: we skip checking timeouts to avoid stalling the pipeline on slow work units. The long-term solution is to scope work units to fit within a particular quota.
// TODO: we skip checking timeouts to avoid stalling the pipeline on slow work units. The
// long-term solution is to scope work units to fit within a particular quota.
tick_timeout: None,
bootstrap_retry: retries.clone(),
work_retry: retries.clone(),
Expand Down
31 changes: 26 additions & 5 deletions src/sync/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use pallas::network::miniprotocols::chainsync::{
use pallas::network::miniprotocols::Point;
use tracing::{debug, info, warn};

use super::apply::set_quota_reached;
use crate::adapters::WalAdapter;
use crate::prelude::*;

Expand Down Expand Up @@ -40,17 +41,19 @@ pub enum PullQuota {
WaitingTip,
Unlimited,
BlockQuota(u64),
Reached,
ReachedTip,
ReachedBlockQuota,
}

impl PullQuota {
fn should_quit(&self) -> bool {
matches!(self, Self::Reached)
matches!(self, Self::ReachedTip | Self::ReachedBlockQuota)
}

fn on_tip(&mut self) {
if let Self::WaitingTip = self {
*self = Self::Reached;
info!("chain tip reached, sync quota will be satisfied");
*self = Self::ReachedTip;
}
}

Expand All @@ -59,8 +62,13 @@ impl PullQuota {
let new = x.saturating_sub(count);

if new == 0 {
*self = Self::Reached;
info!(
"block quota of {} blocks has been consumed, sync quota will be satisfied",
x
);
*self = Self::ReachedBlockQuota;
} else {
debug!("block quota: {} blocks remaining", new);
*self = Self::BlockQuota(new);
}
}
Expand Down Expand Up @@ -165,7 +173,20 @@ impl gasket::framework::Worker<Stage> for Worker {

async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<WorkUnit>, WorkerError> {
if stage.quota.should_quit() {
warn!("quota reached, stopping sync");
match stage.quota {
PullQuota::ReachedTip => {
info!("sync quota reached: chain tip has been reached, stopping sync as configured");
set_quota_reached(true);
}
PullQuota::ReachedBlockQuota => {
info!("sync quota reached: maximum number of blocks has been processed, stopping sync as configured");
set_quota_reached(true);
}
_ => {
warn!("quota reached, stopping sync");
set_quota_reached(true);
}
}
return Ok(WorkSchedule::Done);
}

Expand Down