Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/api-builder/src/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl GlobalApiCtx {
name: &'static str,
) -> Result<Self> {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
let db = gas::prelude::db::DatabaseKv::from_pools(pools.clone()).await?;
let db = gas::prelude::db::DatabaseKv::new(config.clone(), pools.clone()).await?;

Ok(Self {
db,
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use gas::prelude::*;
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
let ctx = StandaloneCtx::new(
db::DatabaseKv::from_pools(pools.clone()).await?,
db::DatabaseKv::new(config.clone(), pools.clone()).await?,
config.clone(),
pools,
cache,
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub struct Pegboard {
///
/// **Experimental**
pub serverless_backoff_max_exponent: Option<usize>,

/// Global pool desired max.
pub pool_desired_max_override: Option<u32>,
}

impl Pegboard {
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/config/src/config/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
pub struct Runtime {
/// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When
/// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.
pub worker_cpu_max: Option<usize>,
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
/// Defaults to 30 seconds.
worker_shutdown_duration: Option<u32>,
Expand Down
41 changes: 39 additions & 2 deletions engine/packages/engine/src/commands/wf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::*;
use anyhow::{Result, ensure};
use clap::{Parser, ValueEnum};
use gas::db::{
self, Database,
Expand Down Expand Up @@ -32,6 +32,18 @@ pub enum SubCommand {
Silence { workflow_ids: Vec<Id> },
/// Sets the wake immediate property of a workflow to true.
Wake { workflow_ids: Vec<Id> },
/// Wakes dead workflows that match the name and error queries.
Revive {
#[clap(short = 'n', long)]
name: Vec<String>,
/// Matches via substring (i.e. error = "database" will match workflows that died with error "database transaction failed").
#[clap(short = 'e', long)]
error: Vec<String>,
#[clap(short = 'd', long)]
dry_run: bool,
#[clap(short = 'p', long)]
parallelization: Option<u128>,
},
/// Lists the entire event history of a workflow.
History {
#[clap(index = 1)]
Expand All @@ -58,7 +70,7 @@ pub enum SubCommand {
impl SubCommand {
pub async fn execute(self, config: rivet_config::Config) -> Result<()> {
let pools = rivet_pools::Pools::new(config.clone()).await?;
let db = db::DatabaseKv::from_pools(pools).await? as Arc<dyn DatabaseDebug>;
let db = db::DatabaseKv::new(config.clone(), pools).await? as Arc<dyn DatabaseDebug>;

match self {
Self::Get { workflow_ids } => {
Expand All @@ -85,6 +97,31 @@ impl SubCommand {
}
Self::Silence { workflow_ids } => db.silence_workflows(workflow_ids).await,
Self::Wake { workflow_ids } => db.wake_workflows(workflow_ids).await,
Self::Revive {
name,
error,
dry_run,
parallelization,
} => {
ensure!(!name.is_empty(), "must provide at least one name");

let total = db
.revive_workflows(
&name.iter().map(|x| x.as_str()).collect::<Vec<_>>(),
&error.iter().map(|x| x.as_str()).collect::<Vec<_>>(),
dry_run,
parallelization.unwrap_or(1),
)
.await?;

if dry_run {
rivet_term::status::success("Workflows Matched", total);
} else {
rivet_term::status::success("Workflows Revived", total);
}

Ok(())
}
Self::History {
workflow_id,
exclude_json,
Expand Down
15 changes: 10 additions & 5 deletions engine/packages/epoxy/src/ops/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
.udb()?
.run(|tx| async move { utils::read_config(&tx, replica_id).await })
.custom_instrument(tracing::info_span!("read_config_tx"))
.await?;
.await
.context("failed reading config")?;

// Lead consensus
let payload = ctx
Expand All @@ -48,7 +49,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
async move { replica::lead_consensus::lead_consensus(&*tx, replica_id, proposal).await }
})
.custom_instrument(tracing::info_span!("lead_consensus_tx"))
.await?;
.await
.context("failed leading consensus")?;

// Get quorum members (only active replicas for voting)
let quorum_members = utils::get_quorum_members(&config);
Expand All @@ -66,7 +68,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
async move { replica::decide_path::decide_path(&*tx, pre_accept_oks, &payload) }
})
.custom_instrument(tracing::info_span!("decide_path_tx"))
.await?;
.await
.context("failed deciding path")?;

match path {
Path::PathFast(protocol::PathFast { payload }) => {
Expand Down Expand Up @@ -105,7 +108,8 @@ pub async fn run_paxos_accept(
async move { replica::messages::accepted(&*tx, replica_id, payload).await }
})
.custom_instrument(tracing::info_span!("accept_tx"))
.await?;
.await
.context("failed accepting")?;

// EPaxos Step 17
let quorum = send_accepts(
Expand Down Expand Up @@ -150,7 +154,8 @@ pub async fn commit(
}
})
.custom_instrument(tracing::info_span!("committed_tx"))
.await?
.await
.context("failed committing")?
};

// EPaxos Step 23
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/workflows/purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ pub async fn epoxy_purger(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
let replica_id = input.replica_id;

async move {
let sig = ctx.listen::<Purge>().await?;
let signals = ctx.listen_n::<Purge>(1024).await?;

ctx.activity(PurgeInput {
replica_id,
keys: sig.keys,
keys: signals.into_iter().flat_map(|sig| sig.keys).collect(),
})
.await?;

Expand Down
5 changes: 3 additions & 2 deletions engine/packages/gasoline/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ impl TestCtx {
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())
.expect("failed to create cache");

let db = db::DatabaseKv::from_pools(pools.clone()).await?;
let debug_db = db::DatabaseKv::from_pools(pools.clone()).await? as Arc<dyn DatabaseDebug>;
let db = db::DatabaseKv::new(config.clone(), pools.clone()).await?;
let debug_db =
db::DatabaseKv::new(config.clone(), pools.clone()).await? as Arc<dyn DatabaseDebug>;

let service_name = format!("{}-test--{}", rivet_env::service_name(), "gasoline_test");
let ray_id = Id::new_v1(config.dc_label());
Expand Down
21 changes: 12 additions & 9 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl WorkflowCtx {

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input).in_current_span())
.await
.map_err(|_| WorkflowError::ActivityTimeout(0));
.map_err(|_| WorkflowError::ActivityTimeout(A::NAME, 0));

let dt = start_instant.elapsed().as_secs_f64();

Expand Down Expand Up @@ -401,7 +401,7 @@ impl WorkflowCtx {
],
);

Err(WorkflowError::ActivityFailure(err, 0))
Err(WorkflowError::ActivityFailure(A::NAME, err, 0))
}
Err(err) => {
tracing::debug!("activity timeout");
Expand Down Expand Up @@ -604,25 +604,28 @@ impl WorkflowCtx {
// Convert error in the case of max retries exceeded. This will only act on retryable
// errors
let err = match err {
WorkflowError::ActivityFailure(err, _) => {
WorkflowError::ActivityFailure(name, err, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
WorkflowError::ActivityMaxFailuresReached(name, err)
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
WorkflowError::ActivityFailure(name, err, error_count)
}
}
WorkflowError::ActivityTimeout(_) => {
WorkflowError::ActivityTimeout(name, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
WorkflowError::ActivityMaxFailuresReached(name, err.into())
} else {
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
WorkflowError::ActivityTimeout(name, error_count)
}
}
WorkflowError::OperationTimeout(op_name, _) => {
if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err.into())
WorkflowError::ActivityMaxFailuresReached(
I::Activity::NAME,
err.into(),
)
} else {
// Add error count to the error for backoff calculation
WorkflowError::OperationTimeout(op_name, error_count)
Expand Down
10 changes: 9 additions & 1 deletion engine/packages/gasoline/src/db/debug.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::*;
use anyhow::Result;
use rivet_util::Id;

use super::Database;
Expand Down Expand Up @@ -39,6 +39,14 @@ pub trait DatabaseDebug: Database {
) -> Result<Vec<SignalData>>;

async fn silence_signals(&self, signal_ids: Vec<Id>) -> Result<()>;

async fn revive_workflows(
&self,
names: &[&str],
error_like: &[&str],
dry_run: bool,
parallelization: u128,
) -> Result<usize>;
}

#[derive(Debug)]
Expand Down
Loading
Loading