diff --git a/engine/packages/api-builder/src/global_context.rs b/engine/packages/api-builder/src/global_context.rs index 62c17f366d..bac1f6aa07 100644 --- a/engine/packages/api-builder/src/global_context.rs +++ b/engine/packages/api-builder/src/global_context.rs @@ -18,7 +18,7 @@ impl GlobalApiCtx { name: &'static str, ) -> Result { let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; - let db = gas::prelude::db::DatabaseKv::from_pools(pools.clone()).await?; + let db = gas::prelude::db::DatabaseKv::new(config.clone(), pools.clone()).await?; Ok(Self { db, diff --git a/engine/packages/bootstrap/src/lib.rs b/engine/packages/bootstrap/src/lib.rs index 35088c2d30..f74f8624a6 100644 --- a/engine/packages/bootstrap/src/lib.rs +++ b/engine/packages/bootstrap/src/lib.rs @@ -3,7 +3,7 @@ use gas::prelude::*; pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, + db::DatabaseKv::new(config.clone(), pools.clone()).await?, config.clone(), pools, cache, diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 308d807bb7..97eeda02a6 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -75,6 +75,9 @@ pub struct Pegboard { /// /// **Experimental** pub serverless_backoff_max_exponent: Option, + + /// Global pool desired max. + pub pool_desired_max_override: Option, } impl Pegboard { diff --git a/engine/packages/config/src/config/runtime.rs b/engine/packages/config/src/config/runtime.rs index 489b95507d..cde4350bc2 100644 --- a/engine/packages/config/src/config/runtime.rs +++ b/engine/packages/config/src/config/runtime.rs @@ -5,6 +5,9 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] pub struct Runtime { + /// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When + /// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu. + pub worker_cpu_max: Option, /// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. /// Defaults to 30 seconds. worker_shutdown_duration: Option, diff --git a/engine/packages/engine/src/commands/wf/mod.rs b/engine/packages/engine/src/commands/wf/mod.rs index e45ab83a22..0fbcf511c5 100644 --- a/engine/packages/engine/src/commands/wf/mod.rs +++ b/engine/packages/engine/src/commands/wf/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::*; +use anyhow::{Result, ensure}; use clap::{Parser, ValueEnum}; use gas::db::{ self, Database, @@ -32,6 +32,18 @@ pub enum SubCommand { Silence { workflow_ids: Vec }, /// Sets the wake immediate property of a workflow to true. Wake { workflow_ids: Vec }, + /// Wakes dead workflows that match the name and error queries. + Revive { + #[clap(short = 'n', long)] + name: Vec, + /// Matches via substring (i.e. error = "database" will match workflows that died with error "database transaction failed"). + #[clap(short = 'e', long)] + error: Vec, + #[clap(short = 'd', long)] + dry_run: bool, + #[clap(short = 'p', long)] + parallelization: Option, + }, /// Lists the entire event history of a workflow. History { #[clap(index = 1)] @@ -58,7 +70,7 @@ pub enum SubCommand { impl SubCommand { pub async fn execute(self, config: rivet_config::Config) -> Result<()> { let pools = rivet_pools::Pools::new(config.clone()).await?; - let db = db::DatabaseKv::from_pools(pools).await? as Arc; + let db = db::DatabaseKv::new(config.clone(), pools).await? as Arc; match self { Self::Get { workflow_ids } => { @@ -85,6 +97,31 @@ impl SubCommand { } Self::Silence { workflow_ids } => db.silence_workflows(workflow_ids).await, Self::Wake { workflow_ids } => db.wake_workflows(workflow_ids).await, + Self::Revive { + name, + error, + dry_run, + parallelization, + } => { + ensure!(!name.is_empty(), "must provide at least one name"); + + let total = db + .revive_workflows( + &name.iter().map(|x| x.as_str()).collect::>(), + &error.iter().map(|x| x.as_str()).collect::>(), + dry_run, + parallelization.unwrap_or(1), + ) + .await?; + + if dry_run { + rivet_term::status::success("Workflows Matched", total); + } else { + rivet_term::status::success("Workflows Revived", total); + } + + Ok(()) + } Self::History { workflow_id, exclude_json, diff --git a/engine/packages/epoxy/src/ops/propose.rs b/engine/packages/epoxy/src/ops/propose.rs index 81ced977ae..73aff0881d 100644 --- a/engine/packages/epoxy/src/ops/propose.rs +++ b/engine/packages/epoxy/src/ops/propose.rs @@ -38,7 +38,8 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result Result Result { @@ -105,7 +108,8 @@ pub async fn run_paxos_accept( async move { replica::messages::accepted(&*tx, replica_id, payload).await } }) .custom_instrument(tracing::info_span!("accept_tx")) - .await?; + .await + .context("failed accepting")?; // EPaxos Step 17 let quorum = send_accepts( @@ -150,7 +154,8 @@ pub async fn commit( } }) .custom_instrument(tracing::info_span!("committed_tx")) - .await? + .await + .context("failed committing")? }; // EPaxos Step 23 diff --git a/engine/packages/epoxy/src/workflows/purger.rs b/engine/packages/epoxy/src/workflows/purger.rs index f68339b349..2ca69a004f 100644 --- a/engine/packages/epoxy/src/workflows/purger.rs +++ b/engine/packages/epoxy/src/workflows/purger.rs @@ -22,11 +22,11 @@ pub async fn epoxy_purger(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { let replica_id = input.replica_id; async move { - let sig = ctx.listen::().await?; + let signals = ctx.listen_n::(1024).await?; ctx.activity(PurgeInput { replica_id, - keys: sig.keys, + keys: signals.into_iter().flat_map(|sig| sig.keys).collect(), }) .await?; diff --git a/engine/packages/gasoline/src/ctx/test.rs b/engine/packages/gasoline/src/ctx/test.rs index 1479fe193a..dd2d12ea0a 100644 --- a/engine/packages/gasoline/src/ctx/test.rs +++ b/engine/packages/gasoline/src/ctx/test.rs @@ -55,8 +55,9 @@ impl TestCtx { let cache = rivet_cache::CacheInner::from_env(&config, pools.clone()) .expect("failed to create cache"); - let db = db::DatabaseKv::from_pools(pools.clone()).await?; - let debug_db = db::DatabaseKv::from_pools(pools.clone()).await? as Arc; + let db = db::DatabaseKv::new(config.clone(), pools.clone()).await?; + let debug_db = + db::DatabaseKv::new(config.clone(), pools.clone()).await? as Arc; let service_name = format!("{}-test--{}", rivet_env::service_name(), "gasoline_test"); let ray_id = Id::new_v1(config.dc_label()); diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index f5af928f6a..2078ac30bc 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -301,7 +301,7 @@ impl WorkflowCtx { let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input).in_current_span()) .await - .map_err(|_| WorkflowError::ActivityTimeout(0)); + .map_err(|_| WorkflowError::ActivityTimeout(A::NAME, 0)); let dt = start_instant.elapsed().as_secs_f64(); @@ -401,7 +401,7 @@ impl WorkflowCtx { ], ); - Err(WorkflowError::ActivityFailure(err, 0)) + Err(WorkflowError::ActivityFailure(A::NAME, err, 0)) } Err(err) => { tracing::debug!("activity timeout"); @@ -604,25 +604,28 @@ impl WorkflowCtx { // Convert error in the case of max retries exceeded. This will only act on retryable // errors let err = match err { - WorkflowError::ActivityFailure(err, _) => { + WorkflowError::ActivityFailure(name, err, _) => { if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { - WorkflowError::ActivityMaxFailuresReached(err) + WorkflowError::ActivityMaxFailuresReached(name, err) } else { // Add error count to the error for backoff calculation - WorkflowError::ActivityFailure(err, error_count) + WorkflowError::ActivityFailure(name, err, error_count) } } - WorkflowError::ActivityTimeout(_) => { + WorkflowError::ActivityTimeout(name, _) => { if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { - WorkflowError::ActivityMaxFailuresReached(err.into()) + WorkflowError::ActivityMaxFailuresReached(name, err.into()) } else { // Add error count to the error for backoff calculation - WorkflowError::ActivityTimeout(error_count) + WorkflowError::ActivityTimeout(name, error_count) } } WorkflowError::OperationTimeout(op_name, _) => { if error_count.saturating_add(1) >= I::Activity::MAX_RETRIES { - WorkflowError::ActivityMaxFailuresReached(err.into()) + WorkflowError::ActivityMaxFailuresReached( + I::Activity::NAME, + err.into(), + ) } else { // Add error count to the error for backoff calculation WorkflowError::OperationTimeout(op_name, error_count) diff --git a/engine/packages/gasoline/src/db/debug.rs b/engine/packages/gasoline/src/db/debug.rs index 5b37dabe46..524f8101b5 100644 --- a/engine/packages/gasoline/src/db/debug.rs +++ b/engine/packages/gasoline/src/db/debug.rs @@ -1,4 +1,4 @@ -use anyhow::*; +use anyhow::Result; use rivet_util::Id; use super::Database; @@ -39,6 +39,14 @@ pub trait DatabaseDebug: Database { ) -> Result>; async fn silence_signals(&self, signal_ids: Vec) -> Result<()>; + + async fn revive_workflows( + &self, + names: &[&str], + error_like: &[&str], + dry_run: bool, + parallelization: u128, + ) -> Result; } #[derive(Debug)] diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index 28ae6793f5..b4b214cccf 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -2,10 +2,11 @@ use std::{ collections::HashMap, ops::Deref, result::Result::{Err, Ok}, + time::Duration, }; use anyhow::{Context, Result, ensure}; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use rivet_util::Id; use tracing::Instrument; use universaldb::utils::{FormalChunkedKey, FormalKey, IsolationLevel::*, end_of_key_range}; @@ -15,6 +16,7 @@ use universaldb::{ tuple::{PackResult, TupleDepth, TupleUnpack}, value::Value, }; +use uuid::Uuid; use super::{DatabaseKv, keys, update_metric}; use crate::{ @@ -33,6 +35,8 @@ use crate::{ }, }; +const EARLY_TXN_TIMEOUT: Duration = Duration::from_secs(3); + impl DatabaseKv { #[tracing::instrument(skip_all)] async fn get_workflows_inner( @@ -688,7 +692,10 @@ impl DatabaseDebug for DatabaseKv { continue; } - ensure!(!has_output, "cannot wake a completed workflow"); + if has_output { + tracing::warn!("cannot wake a completed workflow"); + continue; + } tx.write( &keys::wake::WorkflowWakeConditionKey::new( @@ -1193,6 +1200,196 @@ impl DatabaseDebug for DatabaseKv { .await .map_err(Into::into) } + + #[tracing::instrument(skip_all)] + async fn revive_workflows( + &self, + names: &[&str], + error_like: &[&str], + dry_run: bool, + parallelization: u128, + ) -> Result { + ensure!(parallelization > 0); + ensure!(parallelization < 1024); + + let chunk_size = u128::MAX / parallelization; + let mut futs = FuturesUnordered::new(); + + for i in 0..parallelization { + let start = i * chunk_size; + futs.push(self.revive_workflows_inner( + names, + error_like, + dry_run, + start, + start + chunk_size, + )); + } + + let mut total = 0; + while let Some(res) = futs.next().await { + total += res?; + } + + tracing::info!(?total, "workflows revived"); + + Ok(total) + } +} + +impl DatabaseKv { + pub async fn revive_workflows_inner( + &self, + names: &[&str], + error_like: &[&str], + dry_run: bool, + start: u128, + end: u128, + ) -> Result { + let mut total = 0; + let mut current_workflow_id = Some(Id::v1(Uuid::from_u128(start), self.config.dc_label())); + let end_workflow_id = Id::v1(Uuid::from_u128(end), self.config.dc_label()); + + loop { + let (new_current_workflow_id, workflow_ids) = self.pools + .udb()? + .run(|tx| { + async move { + let mut workflow_ids = Vec::new(); + + let key_end = keys::workflow::DataSubspaceKey::new_with_workflow_id(end_workflow_id); + let end = self.subspace.subspace(&key_end).range().1; + + let start = if let Some(current_workflow_id) = current_workflow_id { + let key_start = keys::workflow::DataSubspaceKey::new_with_workflow_id(current_workflow_id); + let mut bytes = self.subspace.subspace(&key_start).range().0; + + if let Some(b) = bytes.last_mut() { + *b = 255; + } + + bytes + } else { + let entire_subspace_key = keys::workflow::DataSubspaceKey::new(); + + self.subspace.subspace(&entire_subspace_key).range().0 + }; + + let mut stream = tx.get_ranges_keyvalues( + RangeOption { + mode: StreamingMode::WantAll, + ..(start, end).into() + }, + Snapshot, + ); + + let mut workflows_processed = 0; + let mut current_workflow_id = None; + let mut name_matches = false; + let mut state_matches = true; + let mut error_matches = error_like.is_empty(); + + let fut = async { + while let Some(entry) = stream.try_next().await? { + let workflow_id = *self.subspace.unpack::(entry.key())?; + + if let Some(curr) = current_workflow_id { + if workflow_id != curr { + // Save if matches query + if name_matches && state_matches && error_matches { + workflow_ids.push(curr); + } + + workflows_processed += 1; + + // Reset state + name_matches = false; + state_matches = true; + error_matches = error_like.is_empty(); + } + } + + current_workflow_id = Some(workflow_id); + + if let Ok(name_key) = + self.subspace.unpack::(entry.key()) + { + let workflow_name = name_key.deserialize(entry.value())?; + + name_matches = names.iter().any(|name| &workflow_name == name); + } else if let Ok(_) = self + .subspace + .unpack::(entry.key()) + { + state_matches = false; + } else if let Ok(_) = self + .subspace + .unpack::(entry.key()) + { + state_matches = false; + } else if let Ok(_) = self + .subspace + .unpack::(entry.key()) + { + state_matches = false; + } else if let Ok(_) = self + .subspace + .unpack::(entry.key()) + { + state_matches = false; + } else if let Ok(error_key) = self + .subspace + .unpack::(entry.key()) + { + let error = error_key.deserialize(entry.value())?.to_lowercase(); + + error_matches = error_like.is_empty() || error_like.iter().any(|err| error.contains(err)); + } + } + + if let (Some(workflow_id), true) = ( + current_workflow_id, + name_matches && state_matches && error_matches, + ) { + workflow_ids.push(workflow_id); + current_workflow_id = None; + } + + if current_workflow_id.is_some() { + workflows_processed += 1; + } + + anyhow::Ok(()) + }; + + match tokio::time::timeout(EARLY_TXN_TIMEOUT, fut).await { + Ok(res) => res?, + Err(_) => tracing::debug!("timed out reading workflows"), + } + + tracing::info!(?workflows_processed, matching_workflows=?workflow_ids.len(), "batch processed workflows"); + + Ok((current_workflow_id, workflow_ids)) + } + }) + .instrument(tracing::info_span!("find_dead_workflows_tx")) + .await?; + + current_workflow_id = new_current_workflow_id; + total += workflow_ids.len(); + + if !dry_run { + self.wake_workflows(workflow_ids).await?; + } + + if current_workflow_id.is_none() { + tracing::info!("reached end of workflows"); + break; + } + } + + Ok(total) + } } // Parses Id in third position, ignores the rest diff --git a/engine/packages/gasoline/src/db/kv/keys/workflow.rs b/engine/packages/gasoline/src/db/kv/keys/workflow.rs index 494e26ff52..4be980b6a9 100644 --- a/engine/packages/gasoline/src/db/kv/keys/workflow.rs +++ b/engine/packages/gasoline/src/db/kv/keys/workflow.rs @@ -1106,11 +1106,19 @@ impl<'de> TupleUnpack<'de> for WorkerIdKey { } } -pub struct DataSubspaceKey {} +pub struct DataSubspaceKey { + workflow_id: Option, +} impl DataSubspaceKey { pub fn new() -> Self { - DataSubspaceKey {} + DataSubspaceKey { workflow_id: None } + } + + pub fn new_with_workflow_id(workflow_id: Id) -> Self { + DataSubspaceKey { + workflow_id: Some(workflow_id), + } } } @@ -1120,8 +1128,16 @@ impl TuplePack for DataSubspaceKey { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + let t = (WORKFLOW, DATA); - t.pack(w, tuple_depth) + offset += t.pack(w, tuple_depth)?; + + if let Some(workflow_id) = &self.workflow_id { + offset += workflow_id.pack(w, tuple_depth)?; + } + + Ok(offset) } } diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 2c39aa9c62..2e35ead33f 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -5,7 +5,7 @@ use std::{ collections::{HashMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}, sync::Arc, - time::Instant, + time::{Duration, Instant}, }; use anyhow::{Context, Result, ensure}; @@ -48,8 +48,10 @@ mod system; const WORKER_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30); /// How long before overwriting an existing metrics lock. const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30); +const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500); pub struct DatabaseKv { + config: rivet_config::Config, pools: rivet_pools::Pools, subspace: universaldb::utils::Subspace, system: Mutex, @@ -417,8 +419,12 @@ impl Database for DatabaseKv { std::time::Duration::from_secs(4) } - async fn from_pools(pools: rivet_pools::Pools) -> anyhow::Result> { + async fn new( + config: rivet_config::Config, + pools: rivet_pools::Pools, + ) -> anyhow::Result> { Ok(Arc::new(DatabaseKv { + config, pools, subspace: universaldb::utils::Subspace::new(&(RIVET, GASOLINE, KV)), system: Mutex::new(system::SystemInfo::new()), @@ -436,6 +442,7 @@ impl Database for DatabaseKv { .map_err(WorkflowError::PoolsGeneric)? .subscribe(&subjects::convert(subject)) .await + .context("failed to subscribe to bump sub") .map_err(|x| WorkflowError::CreateSubscription(x.into()))?; let stream = async_stream::stream! { @@ -463,6 +470,7 @@ impl Database for DatabaseKv { .map_err(WorkflowError::PoolsGeneric)? .run(|tx| { async move { + let start = Instant::now(); let now = rivet_util::timestamp::now(); let mut last_ping_cache: Vec<(Id, i64)> = Vec::new(); @@ -484,7 +492,16 @@ impl Database for DatabaseKv { Snapshot, ); - while let Some(lease_key_entry) = stream.try_next().await? { + loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out processing expired leases"); + break; + } + + let Some(lease_key_entry) = stream.try_next().await? else { + break; + }; + let lease_key = self .subspace .unpack::(lease_key_entry.key())?; @@ -580,6 +597,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("clear_expired_leases_tx")) .await + .context("failed to clear expired leases") .map_err(WorkflowError::Udb)?; if expired_workflow_count != 0 { @@ -630,6 +648,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("acquire_lock_tx")) .await + .context("failed to acquire metrics lock") .map_err(WorkflowError::Udb)?; if acquired_lock { @@ -659,6 +678,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("read_metrics_tx")) .await + .context("failed to read metrics") .map_err(WorkflowError::Udb)?; let mut total_workflow_counts: Vec<(String, usize)> = Vec::new(); @@ -675,9 +695,9 @@ impl Database for DatabaseKv { .iter_mut() .find(|(name, _)| name == &workflow_name) { - entry.1 += 1; + entry.1 += count; } else { - total_workflow_counts.push((workflow_name, 1)); + total_workflow_counts.push((workflow_name, count)); } } keys::metric::GaugeMetric::WorkflowSleeping(workflow_name) => { @@ -690,9 +710,9 @@ impl Database for DatabaseKv { .iter_mut() .find(|(name, _)| name == &workflow_name) { - entry.1 += 1; + entry.1 += count; } else { - total_workflow_counts.push((workflow_name, 1)); + total_workflow_counts.push((workflow_name, count)); } } keys::metric::GaugeMetric::WorkflowDead(workflow_name, error) => { @@ -708,9 +728,9 @@ impl Database for DatabaseKv { .iter_mut() .find(|(name, _)| name == &workflow_name) { - entry.1 += 1; + entry.1 += count; } else { - total_workflow_counts.push((workflow_name, 1)); + total_workflow_counts.push((workflow_name, count)); } } keys::metric::GaugeMetric::WorkflowComplete(workflow_name) => { @@ -718,9 +738,9 @@ impl Database for DatabaseKv { .iter_mut() .find(|(name, _)| name == &workflow_name) { - entry.1 += 1; + entry.1 += count; } else { - total_workflow_counts.push((workflow_name, 1)); + total_workflow_counts.push((workflow_name, count)); } } keys::metric::GaugeMetric::SignalPending(signal_name) => { @@ -749,6 +769,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("clear_lock_tx")) .await + .context("failed to release metrics lock") .map_err(WorkflowError::Udb)?; } @@ -789,6 +810,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("update_worker_ping_tx")) .await + .context("failed to update worker ping") .map_err(WorkflowError::Udb)?; Ok(()) @@ -814,6 +836,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("mark_worker_inactive_tx")) .await + .context("failed to mark worker inactive") .map_err(WorkflowError::Udb)?; Ok(()) @@ -1010,9 +1033,26 @@ impl Database for DatabaseKv { // Determine load shedding ratio based on linear mapping on cpu usage. We will gradually // pull less workflows as the cpu usage increases - let cpu_usage = { self.system.lock().await.cpu_usage() }; + // | . . + // 100% | _____ . + // | .\ . + // % wfs | . \ . + // | . \. + // 5% | . \_____ + // |_____.___.______ + // 0 50% 80% + // avg cpu usage + let cpu_usage_ratio = { + self.system + .lock() + .await + .cpu_usage_ratio(self.config.runtime.worker_cpu_max) + }; let load_shed_ratio_x1000 = - calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100); + calc_pull_ratio((cpu_usage_ratio * 1000.0) as u64, 500, 1000, 800, 50); + + // Record load shedding ratio metric + metrics::LOAD_SHEDDING_RATIO.record(load_shed_ratio_x1000 as f64 / 1000.0, &[]); let active_worker_subspace_start = tx.pack( &keys::worker::ActiveWorkerIdxKey::subspace(active_workers_after), @@ -1039,32 +1079,52 @@ impl Database for DatabaseKv { Ok(key.worker_id) }) .try_collect::>(), - futures_util::stream::iter(owned_filter) - .map(|wf_name| { - let wake_subspace_start = end_of_key_range(&tx.pack( - &keys::wake::WorkflowWakeConditionKey::subspace_without_ts( - wf_name.clone(), - ), - )); - let wake_subspace_end = - tx.pack(&keys::wake::WorkflowWakeConditionKey::subspace( - wf_name, - pull_before, + async { + let start = Instant::now(); + let mut buffer = Vec::new(); + let mut stream = futures_util::stream::iter(owned_filter) + .map(|wf_name| { + let wake_subspace_start = end_of_key_range(&tx.pack( + &keys::wake::WorkflowWakeConditionKey::subspace_without_ts( + wf_name.clone(), + ), )); + let wake_subspace_end = + tx.pack(&keys::wake::WorkflowWakeConditionKey::subspace( + wf_name, + pull_before, + )); - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(wake_subspace_start, wake_subspace_end).into() - }, - // This is Snapshot to reduce contention with any new wake conditions - // being inserted. Conflicts are handled by workflow leases. - Snapshot, - ) - }) - .flatten() - .map(|res| tx.unpack::(res?.key())) - .try_collect::>(), + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(wake_subspace_start, wake_subspace_end).into() + }, + // This is Snapshot to reduce contention with any new wake conditions + // being inserted. Conflicts are handled by workflow leases. + Snapshot, + ) + }) + .flatten() + .map(|res| { + tx.unpack::(res?.key()) + }); + + loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out pulling wake conditions"); + break; + } + + let Some(wake_key) = stream.try_next().await? else { + break; + }; + + buffer.push(wake_key); + } + + anyhow::Ok(buffer) + } )?; // Sort for consistency across all workers @@ -1089,67 +1149,77 @@ impl Database for DatabaseKv { let active_worker_count = active_worker_ids.len() as u64; // Collect name and deadline ts for each wf id - let mut dedup_workflows: Vec = Vec::new(); + let mut dedup_workflows = HashMap::::new(); for wake_key in &wake_keys { - if let Some(wf) = dedup_workflows - .iter_mut() - .find(|wf| wf.workflow_id == wake_key.workflow_id) - { - let key_wake_deadline_ts = wake_key.condition.deadline_ts(); + let Some(wf) = dedup_workflows.get_mut(&wake_key.workflow_id) else { + dedup_workflows.insert( + wake_key.workflow_id, + MinimalPulledWorkflow { + workflow_id: wake_key.workflow_id, + workflow_name: wake_key.workflow_name.clone(), + wake_deadline_ts: wake_key.condition.deadline_ts(), + earliest_wake_condition_ts: wake_key.ts, + }, + ); - // Update wake condition ts if earlier - if wake_key.ts < wf.earliest_wake_condition_ts { - wf.earliest_wake_condition_ts = wake_key.ts; - } - - // Update wake deadline ts if earlier - if wf.wake_deadline_ts.is_none() - || key_wake_deadline_ts < wf.wake_deadline_ts - { - wf.wake_deadline_ts = key_wake_deadline_ts; + // Hard limit of 10k deduped workflows, this gets further limited to 1000 at + // `assigned_workflows` + if dedup_workflows.len() >= 10000 { + break; } continue; + }; + + let key_wake_deadline_ts = wake_key.condition.deadline_ts(); + + // Update wake condition ts if earlier + if wake_key.ts < wf.earliest_wake_condition_ts { + wf.earliest_wake_condition_ts = wake_key.ts; } - dedup_workflows.push(MinimalPulledWorkflow { - workflow_id: wake_key.workflow_id, - workflow_name: wake_key.workflow_name.clone(), - wake_deadline_ts: wake_key.condition.deadline_ts(), - earliest_wake_condition_ts: wake_key.ts, - }); + // Update wake deadline ts if earlier + if wf.wake_deadline_ts.is_none() + || key_wake_deadline_ts < wf.wake_deadline_ts + { + wf.wake_deadline_ts = key_wake_deadline_ts; + } } // Filter workflows in a way that spreads all current pending workflows across all active // workers evenly - let assigned_workflows = dedup_workflows.into_iter().filter(|wf| { - let mut hasher = DefaultHasher::new(); - - // Earliest wake condition ts is consistent for hashing purposes because when it - // changes it means a worker has leased it - wf.earliest_wake_condition_ts.hash(&mut hasher); - let wf_hash = hasher.finish(); - - let pseudorandom_value_x1000 = { - // Add a little pizazz to the hash so its a different number than wf_hash but - // still consistent - 1234i32.hash(&mut hasher); - hasher.finish() % 1000 // 0-1000 - }; - - if pseudorandom_value_x1000 < load_shed_ratio_x1000 { - return false; - } + let assigned_workflows = dedup_workflows + .into_values() + .filter(|wf| { + let mut hasher = DefaultHasher::new(); + + // Earliest wake condition ts is consistent for hashing purposes because when it + // changes it means a worker has leased it + wf.earliest_wake_condition_ts.hash(&mut hasher); + let wf_hash = hasher.finish(); + + let pseudorandom_value_x1000 = { + // Add a little pizazz to the hash so its a different number than wf_hash but + // still consistent + 1234i32.hash(&mut hasher); + hasher.finish() % 1000 // 0-1000 + }; + + if pseudorandom_value_x1000 > load_shed_ratio_x1000 { + return false; + } - let wf_worker_idx = wf_hash % active_worker_count; + let wf_worker_idx = wf_hash % active_worker_count; - // Every worker pulls workflows that has to the current worker as well as the next - // worker for redundancy. this results in increased txn conflicts but less chance of - // orphaned workflows - let next_worker_idx = (current_worker_idx + 1) % active_worker_count; + // Every worker pulls workflows that has to the current worker as well as the next + // worker for redundancy. this results in increased txn conflicts but less chance of + // orphaned workflows + let next_worker_idx = (current_worker_idx + 1) % active_worker_count; - wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx - }); + wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx + }) + // Hard limit of 1000 workflows per pull + .take(1000); // Check leases let leased_workflows = futures_util::stream::iter(assigned_workflows) @@ -1252,6 +1322,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("pull_workflows_tx")) .await + .context("failed to lease workflows") .map_err(WorkflowError::Udb)?; let worker_id_str = worker_id.to_string(); @@ -1285,8 +1356,10 @@ impl Database for DatabaseKv { let ray_id_key = keys::workflow::RayIdKey::new(wf.workflow_id); let input_key = keys::workflow::InputKey::new(wf.workflow_id); let state_key = keys::workflow::StateKey::new(wf.workflow_id); + let output_key = keys::workflow::OutputKey::new(wf.workflow_id); let input_subspace = self.subspace.subspace(&input_key); let state_subspace = self.subspace.subspace(&state_key); + let output_subspace = self.subspace.subspace(&output_key); let active_history_subspace = self.subspace.subspace( &keys::history::HistorySubspaceKey::new( wf.workflow_id, @@ -1299,36 +1372,39 @@ impl Database for DatabaseKv { ray_id_entry, input_chunks, state_chunks, + has_output, events, ) = tokio::try_join!( - async { - tx.get(&self.subspace.pack(&create_ts_key), Serializable) - .await - }, - async { - tx.get(&self.subspace.pack(&ray_id_key), Serializable).await - }, - async { - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(&input_subspace).into() - }, - Serializable, - ) - .try_collect::>() - .await - }, + tx.get(&self.subspace.pack(&create_ts_key), Serializable), + tx.get(&self.subspace.pack(&ray_id_key), Serializable), + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&input_subspace).into() + }, + Serializable, + ) + .try_collect::>(), + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&state_subspace).into() + }, + Serializable, + ) + .try_collect::>(), async { tx.get_ranges_keyvalues( universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(&state_subspace).into() + mode: StreamingMode::Exact, + limit: Some(1), + ..(&output_subspace).into() }, Serializable, ) - .try_collect::>() + .try_next() .await + .map(|entry| entry.is_some()) }, async { let mut events_by_location: HashMap> = @@ -1523,6 +1599,19 @@ impl Database for DatabaseKv { } )?; + if has_output { + tracing::warn!(workflow_id=?wf.workflow_id, "workflow already completed, ignoring"); + + // Clear lease + let lease_key = keys::workflow::LeaseKey::new(wf.workflow_id); + tx.clear(&self.subspace.pack(&lease_key)); + let worker_id_key = + keys::workflow::WorkerIdKey::new(wf.workflow_id); + tx.clear(&self.subspace.pack(&worker_id_key)); + + return Ok(None); + } + let create_ts = create_ts_key .deserialize(&create_ts_entry.context("key should exist")?)?; let ray_id = ray_id_key @@ -1534,7 +1623,7 @@ impl Database for DatabaseKv { state_key.combine(state_chunks)? }; - Result::<_>::Ok(PulledWorkflowData { + anyhow::Ok(Some(PulledWorkflowData { workflow_id: wf.workflow_id, workflow_name: wf.workflow_name, create_ts, @@ -1543,11 +1632,12 @@ impl Database for DatabaseKv { state, wake_deadline_ts: wf.wake_deadline_ts, events, - }) + })) } }) // TODO: How to get rid of this buffer? .buffer_unordered(512) + .try_filter_map(|x| std::future::ready(Ok(x))) .try_collect::>() .instrument(tracing::trace_span!("map_to_partial_workflow")) .await @@ -1555,6 +1645,7 @@ impl Database for DatabaseKv { }) .custom_instrument(tracing::info_span!("pull_workflow_history_tx")) .await + .context("failed to pull workflow history") .map_err(WorkflowError::Udb)?; let dt2 = start_instant2.elapsed().as_secs_f64(); @@ -3112,13 +3203,16 @@ fn value_to_str(v: &serde_json::Value) -> WorkflowResult { } fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 { - // must have neg slope, inversely proportional + // Must have neg slope, inversely proportional assert!(ax < bx); assert!(ay > by); - let neg_dy = ay - by; + // Bound domain + let x = x.max(ax).min(bx); + let dx = bx - ax; - let neg_b = ay * neg_dy / dx; + let neg_dy = ay - by; + let b = ay + ax * neg_dy / dx; - return neg_b.saturating_sub(x * neg_dy / dx); + return b.saturating_sub(x * neg_dy / dx); } diff --git a/engine/packages/gasoline/src/db/kv/system.rs b/engine/packages/gasoline/src/db/kv/system.rs index fc09dee12c..ab325be1d9 100644 --- a/engine/packages/gasoline/src/db/kv/system.rs +++ b/engine/packages/gasoline/src/db/kv/system.rs @@ -1,33 +1,162 @@ -use std::time::Instant; +use std::fs; +use std::time::{Duration, Instant}; -use sysinfo::{CpuRefreshKind, MINIMUM_CPU_UPDATE_INTERVAL, RefreshKind, System}; +use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System}; + +const CPU_UPDATE_INTERVAL: Duration = Duration::from_millis(150); pub struct SystemInfo { system: System, + pid: Pid, last_cpu_usage_read: Instant, + cgroup_cpu_max: Option, + last_cgroup_usage_usec: Option, + last_cgroup_cores: f32, } impl SystemInfo { pub fn new() -> Self { SystemInfo { system: System::new_with_specifics( - RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing().with_cpu_usage()), + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) + .with_processes(ProcessRefreshKind::nothing().with_cpu()), ), + pid: Pid::from_u32(std::process::id()), last_cpu_usage_read: Instant::now(), + cgroup_cpu_max: CgroupCpuMax::read(), + last_cgroup_usage_usec: CgroupCpuUsage::read(), + last_cgroup_cores: 0.0, + } + } + + /// Returns a float 0.0-1.0 of the avg cpu usage in the current container (if cgroups are configured) or + /// otherwise for the current process. + pub fn cpu_usage_ratio(&mut self, cpu_max: Option) -> f32 { + // 1 = 1 core + let cpu_max = if let Some(cpu_max) = cpu_max { + cpu_max as f32 / 1000.0 + } else { + if let Some(CgroupCpuMax { quota, period }) = self.cgroup_cpu_max { + if quota > 0 { + quota as f32 / period as f32 + } else { + // Negative quota means unlimited, use cpu count + self.system.cpus().len() as f32 + } + } else { + self.system.cpus().len() as f32 + } + }; + + let total = if let Some(last_usage_usec) = self.last_cgroup_usage_usec { + // Use cgroup cpu.stat for usage (cumulative counter) + if self.last_cpu_usage_read.elapsed() > CPU_UPDATE_INTERVAL { + if let Some(current_usage_usec) = CgroupCpuUsage::read() { + let elapsed_usec = self.last_cpu_usage_read.elapsed().as_micros() as u64; + let usage_delta_usec = current_usage_usec.saturating_sub(last_usage_usec); + + // Calculate cores used: (usage_delta / elapsed_time) + let cores_used = if elapsed_usec > 0 { + usage_delta_usec as f32 / elapsed_usec as f32 + } else { + 0.0 + }; + + self.last_cgroup_usage_usec = Some(current_usage_usec); + self.last_cpu_usage_read = Instant::now(); + self.last_cgroup_cores = cores_used; + + cores_used + } else { + // Failed to read cgroup, disable cgroup usage tracking + self.last_cgroup_usage_usec = None; + 0.0 + } + } else { + // Not time to update yet, return last calculated value + self.last_cgroup_cores + } + } else { + // Use per-process CPU metrics + if self.last_cpu_usage_read.elapsed() > CPU_UPDATE_INTERVAL { + self.system.refresh_processes_specifics( + ProcessesToUpdate::Some(&[self.pid]), + true, + ProcessRefreshKind::nothing().with_cpu(), + ); + self.last_cpu_usage_read = Instant::now(); + } + + // Get CPU usage for current process (returns percentage 0-100 per core) + self.system + .process(self.pid) + .map(|p| p.cpu_usage() / 100.0) + .unwrap_or(0.0) + }; + + crate::metrics::CPU_USAGE.record(total as f64, &[]); + + total / cpu_max + } +} + +struct CgroupCpuMax { + quota: i64, + period: u64, +} + +impl CgroupCpuMax { + fn read() -> Option { + // cgroups v2 + if let Ok(content) = fs::read_to_string("/sys/fs/cgroup/cpu.max") { + let parts = content.trim().split_whitespace().collect::>(); + if parts.len() == 2 { + return Some(CgroupCpuMax { + quota: parts[0].parse::().ok()?, + period: parts[1].parse::().ok()?, + }); + } } + + // cgroups v1 + let quota = fs::read_to_string("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") + .ok()? + .trim() + .parse() + .ok()?; + let period = fs::read_to_string("/sys/fs/cgroup/cpu/cpu.cfs_period_us") + .ok()? + .trim() + .parse() + .ok()?; + + Some(CgroupCpuMax { quota, period }) } +} + +struct CgroupCpuUsage; + +impl CgroupCpuUsage { + /// Reads CPU usage from cgroup cpu.stat + /// Returns usage in microseconds + fn read() -> Option { + // cgroups v2 + if let Ok(content) = fs::read_to_string("/sys/fs/cgroup/cpu.stat") { + for line in content.lines() { + if let Some(usage) = line.strip_prefix("usage_usec ") { + return usage.trim().parse().ok(); + } + } + } - /// Returns a float 0.0-100.0 of the avg cpu usage over the entire system. - pub fn cpu_usage(&mut self) -> f32 { - if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL { - self.system.refresh_cpu_usage(); - self.last_cpu_usage_read = Instant::now(); + // cgroups v1 + if let Ok(content) = fs::read_to_string("/sys/fs/cgroup/cpuacct/cpuacct.usage") { + // cpuacct.usage is in nanoseconds, convert to microseconds + let usage_nsec: u64 = content.trim().parse().ok()?; + return Some(usage_nsec / 1000); } - self.system - .cpus() - .iter() - .fold(0.0, |s, cpu| s + cpu.cpu_usage()) - / self.system.cpus().len() as f32 + None } } diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index 1e175ea3af..0909ecfacd 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -24,7 +24,7 @@ pub type DatabaseHandle = Arc; #[async_trait::async_trait] pub trait Database: Send { /// Create a new DB instance. - async fn from_pools(pools: rivet_pools::Pools) -> Result> + async fn new(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result> where Self: Sized; diff --git a/engine/packages/gasoline/src/error.rs b/engine/packages/gasoline/src/error.rs index c2e30c7095..69fa449b30 100644 --- a/engine/packages/gasoline/src/error.rs +++ b/engine/packages/gasoline/src/error.rs @@ -9,17 +9,17 @@ pub type WorkflowResult = Result; #[derive(thiserror::Error, Debug)] pub enum WorkflowError { - #[error("workflow failure: {0:?}")] - WorkflowFailure(#[source] anyhow::Error), + #[error("workflow {0} failed: {1:?}")] + WorkflowFailure(&'static str, #[source] anyhow::Error), // Includes error count - #[error("activity failure: {0:?}")] - ActivityFailure(#[source] anyhow::Error, usize), + #[error("activity {0} failed: {1:?}")] + ActivityFailure(&'static str, #[source] anyhow::Error, usize), - #[error("activity failure, max retries reached: {0:?}")] - ActivityMaxFailuresReached(#[source] anyhow::Error), + #[error("activity {0} failed, max retries reached: {1:?}")] + ActivityMaxFailuresReached(&'static str, #[source] anyhow::Error), - #[error("operation failure ({0}): {1:?}")] + #[error("operation {0} failed: {1:?}")] OperationFailure(&'static str, #[source] anyhow::Error), #[error("workflow missing from registry: {0}")] @@ -146,8 +146,8 @@ pub enum WorkflowError { Config(#[source] anyhow::Error), // Includes error count - #[error("activity timed out")] - ActivityTimeout(usize), + #[error("activity {0} timed out")] + ActivityTimeout(&'static str, usize), // Includes error count #[error("operation {0} timed out")] @@ -186,8 +186,8 @@ impl WorkflowError { /// Returns the next deadline for a workflow to be woken up again based on the error. pub(crate) fn deadline_ts(&self) -> Option { match self { - WorkflowError::ActivityFailure(_, error_count) - | WorkflowError::ActivityTimeout(error_count) + WorkflowError::ActivityFailure(_, _, error_count) + | WorkflowError::ActivityTimeout(_, error_count) | WorkflowError::OperationTimeout(_, error_count) => { // NOTE: Max retry is handled in `WorkflowCtx::activity` let mut backoff = rivet_util::backoff::Backoff::new_at( @@ -218,8 +218,8 @@ impl WorkflowError { /// Any error that the workflow can continue on with its execution from. pub(crate) fn is_recoverable(&self) -> bool { match self { - WorkflowError::ActivityFailure(_, _) - | WorkflowError::ActivityTimeout(_) + WorkflowError::ActivityFailure(_, _, _) + | WorkflowError::ActivityTimeout(_, _) | WorkflowError::OperationTimeout(_, _) | WorkflowError::NoSignalFound(_) | WorkflowError::NoSignalFoundAndSleep(_, _) @@ -233,8 +233,8 @@ impl WorkflowError { /// Any error that the workflow can try again on a fixed number of times. Only used for printing. pub(crate) fn is_retryable(&self) -> bool { match self { - WorkflowError::ActivityFailure(_, _) - | WorkflowError::ActivityTimeout(_) + WorkflowError::ActivityFailure(_, _, _) + | WorkflowError::ActivityTimeout(_, _) | WorkflowError::OperationTimeout(_, _) => true, _ => false, } diff --git a/engine/packages/gasoline/src/metrics.rs b/engine/packages/gasoline/src/metrics.rs index b4f05d8bb1..3015ddccc9 100644 --- a/engine/packages/gasoline/src/metrics.rs +++ b/engine/packages/gasoline/src/metrics.rs @@ -148,4 +148,14 @@ lazy_static::lazy_static! { pub static ref OPERATION_ERRORS: Counter = METER.u64_counter("rivet_gasoline_operation_errors") .with_description("All errors made by this operation.") .build(); + + // MARK: Load Shedding + pub static ref CPU_USAGE: Histogram = METER.f64_histogram("rivet_gasoline_cpu_usage") + .with_description("CPU usage (100 per core).") + .with_boundaries(vec![0.0, 0.1, 0.25, 0.5, 1.0, 1.5, 2.0, 4.0, 8.0, 16.0]) + .build(); + pub static ref LOAD_SHEDDING_RATIO: Histogram = METER.f64_histogram("rivet_gasoline_load_shedding_ratio") + .with_description("Load shedding ratio (0-1) based on CPU usage, determining the fraction of workflows to pull.") + .with_boundaries(vec![0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]) + .build(); } diff --git a/engine/packages/gasoline/src/registry.rs b/engine/packages/gasoline/src/registry.rs index fe9905e4c8..aeb65eb4b2 100644 --- a/engine/packages/gasoline/src/registry.rs +++ b/engine/packages/gasoline/src/registry.rs @@ -72,7 +72,9 @@ impl Registry { // Differentiate between WorkflowError and user error Err(err) => match err.downcast::() { Ok(inner_err) => return Err(inner_err), - Err(err) => return Err(WorkflowError::WorkflowFailure(err)), + Err(err) => { + return Err(WorkflowError::WorkflowFailure(W::NAME, err)); + } }, }; diff --git a/engine/packages/guard/src/lib.rs b/engine/packages/guard/src/lib.rs index e72c7c6edd..f4acd5872a 100644 --- a/engine/packages/guard/src/lib.rs +++ b/engine/packages/guard/src/lib.rs @@ -12,7 +12,7 @@ pub mod tls; pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, + db::DatabaseKv::new(config.clone(), pools.clone()).await?, config.clone(), pools, cache, diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 3b2fcca0ea..2eef4ae81b 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -142,6 +142,8 @@ async fn handle_message_mk2( } }; + tracing::debug!(?msg, "received message from runner"); + match msg { protocol::mk2::ToServer::ToServerPong(pong) => { let now = util::timestamp::now(); diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index 59e107384a..ca9c09182b 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -16,8 +16,8 @@ pub fn registry() -> WorkflowResult { registry.register_workflow::()?; registry.register_workflow::()?; registry.register_workflow::()?; - registry.register_workflow::()?; registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index d73493ed20..8c78ca62a3 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -98,7 +98,6 @@ async fn update_state_and_db( let runner_id = state.runner_id.clone(); let namespace_id = state.namespace_id.clone(); let runner_name_selector = state.runner_name_selector.clone(); - let for_serverless = state.for_serverless.clone(); let allocated_serverless_slot = state.allocated_serverless_slot.clone(); let name = state.name.clone(); let create_ts = state.create_ts.clone(); @@ -109,28 +108,15 @@ async fn update_state_and_db( tx.write(&keys::actor::DestroyTsKey::new(input.actor_id), destroy_ts)?; - if let Some(runner_id) = runner_id { - clear_slot( - input.actor_id, - namespace_id, - &runner_name_selector, - runner_id, - for_serverless, - &tx, - ) - .await?; - } else if allocated_serverless_slot { - // Clear the serverless slot even if we do not have a runner id. This happens when the - // actor is destroyed while pending allocation - tx.atomic_op( - &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( - namespace_id, - runner_name_selector.clone(), - ), - &(-1i64).to_le_bytes(), - MutationType::Add, - ); - } + clear_slot( + input.actor_id, + namespace_id, + &runner_name_selector, + runner_id, + allocated_serverless_slot, + &tx, + ) + .await?; // Update namespace indexes tx.delete(&keys::ns::ActiveActorKey::new( @@ -209,84 +195,89 @@ pub(crate) async fn clear_slot( actor_id: Id, namespace_id: Id, runner_name_selector: &str, - runner_id: Id, - for_serverless: bool, + runner_id: Option, + allocated_serverless_slot: bool, tx: &universaldb::Transaction, ) -> Result<()> { let tx = tx.with_subspace(keys::subspace()); - tx.delete(&keys::actor::RunnerIdKey::new(actor_id)); - - // This is cleared when the state changes as well as when the actor is destroyed to ensure - // consistency during rescheduling and forced deletion. - tx.delete(&keys::runner::ActorKey::new(runner_id, actor_id)); - - let runner_workflow_id_key = keys::runner::WorkflowIdKey::new(runner_id); - let runner_version_key = keys::runner::VersionKey::new(runner_id); - let runner_remaining_slots_key = keys::runner::RemainingSlotsKey::new(runner_id); - let runner_total_slots_key = keys::runner::TotalSlotsKey::new(runner_id); - let runner_last_ping_ts_key = keys::runner::LastPingTsKey::new(runner_id); - let runner_protocol_version_key = keys::runner::ProtocolVersionKey::new(runner_id); - - let ( - runner_workflow_id, - runner_version, - runner_remaining_slots, - runner_total_slots, - runner_last_ping_ts, - runner_protocol_version, - ) = tokio::try_join!( - tx.read(&runner_workflow_id_key, Serializable), - tx.read(&runner_version_key, Serializable), - tx.read(&runner_remaining_slots_key, Serializable), - tx.read(&runner_total_slots_key, Serializable), - tx.read(&runner_last_ping_ts_key, Serializable), - tx.read_opt(&runner_protocol_version_key, Serializable), - )?; - - let old_runner_remaining_millislots = (runner_remaining_slots * 1000) / runner_total_slots; - let new_runner_remaining_slots = runner_remaining_slots + 1; - - // Write new remaining slots - tx.write(&runner_remaining_slots_key, new_runner_remaining_slots)?; - - let old_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( - namespace_id, - runner_name_selector.to_string(), - runner_version, - old_runner_remaining_millislots, - runner_last_ping_ts, - runner_id, - ); - - // Only update allocation idx if it existed before - if tx.exists(&old_runner_alloc_key, Serializable).await? { - // Clear old key - tx.delete(&old_runner_alloc_key); - - let new_remaining_millislots = (new_runner_remaining_slots * 1000) / runner_total_slots; - let new_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( + // Only clear slot if we have a runner id + if let Some(runner_id) = runner_id { + tx.delete(&keys::actor::RunnerIdKey::new(actor_id)); + + // This is cleared when the state changes as well as when the actor is destroyed to ensure + // consistency during rescheduling and forced deletion. + tx.delete(&keys::runner::ActorKey::new(runner_id, actor_id)); + + let runner_workflow_id_key = keys::runner::WorkflowIdKey::new(runner_id); + let runner_version_key = keys::runner::VersionKey::new(runner_id); + let runner_remaining_slots_key = keys::runner::RemainingSlotsKey::new(runner_id); + let runner_total_slots_key = keys::runner::TotalSlotsKey::new(runner_id); + let runner_last_ping_ts_key = keys::runner::LastPingTsKey::new(runner_id); + let runner_protocol_version_key = keys::runner::ProtocolVersionKey::new(runner_id); + + let ( + runner_workflow_id, + runner_version, + runner_remaining_slots, + runner_total_slots, + runner_last_ping_ts, + runner_protocol_version, + ) = tokio::try_join!( + tx.read(&runner_workflow_id_key, Serializable), + tx.read(&runner_version_key, Serializable), + tx.read(&runner_remaining_slots_key, Serializable), + tx.read(&runner_total_slots_key, Serializable), + tx.read(&runner_last_ping_ts_key, Serializable), + tx.read_opt(&runner_protocol_version_key, Serializable), + )?; + + let old_runner_remaining_millislots = (runner_remaining_slots * 1000) / runner_total_slots; + let new_runner_remaining_slots = runner_remaining_slots + 1; + + // Write new remaining slots + tx.write(&runner_remaining_slots_key, new_runner_remaining_slots)?; + + let old_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( namespace_id, runner_name_selector.to_string(), runner_version, - new_remaining_millislots, + old_runner_remaining_millislots, runner_last_ping_ts, runner_id, ); - tx.write( - &new_runner_alloc_key, - rivet_data::converted::RunnerAllocIdxKeyData { - workflow_id: runner_workflow_id, - remaining_slots: new_runner_remaining_slots, - total_slots: runner_total_slots, - // We default here because its not important for mk1 protocol runners - protocol_version: runner_protocol_version.unwrap_or(PROTOCOL_MK1_VERSION), - }, - )?; + // Only update allocation idx if it existed before + if tx.exists(&old_runner_alloc_key, Serializable).await? { + // Clear old key + tx.delete(&old_runner_alloc_key); + + let new_remaining_millislots = (new_runner_remaining_slots * 1000) / runner_total_slots; + let new_runner_alloc_key = keys::ns::RunnerAllocIdxKey::new( + namespace_id, + runner_name_selector.to_string(), + runner_version, + new_remaining_millislots, + runner_last_ping_ts, + runner_id, + ); + + tx.write( + &new_runner_alloc_key, + rivet_data::converted::RunnerAllocIdxKeyData { + workflow_id: runner_workflow_id, + remaining_slots: new_runner_remaining_slots, + total_slots: runner_total_slots, + // We default here because its not important for mk1 protocol runners + protocol_version: runner_protocol_version.unwrap_or(PROTOCOL_MK1_VERSION), + }, + )?; + } } - if for_serverless { + if allocated_serverless_slot { + // Clear the serverless slot even if we do not have a runner id. This happens when the + // actor is destroyed while pending allocation tx.atomic_op( &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( namespace_id, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 268d3fcde8..a9e42f54ec 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -535,7 +535,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result Result Result { + let mut state = ctx.state::()?; + + let allocated_serverless_slot = state.allocated_serverless_slot; + // Clear self from alloc queue let cleared = ctx .udb()? @@ -1050,13 +1051,28 @@ pub async fn clear_pending_allocation( let exists = tx.get(&pending_alloc_key, Serializable).await?.is_some(); - tx.clear(&pending_alloc_key); + if exists { + tx.clear(&pending_alloc_key); + + if allocated_serverless_slot { + tx.atomic_op( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( + input.namespace_id, + input.runner_name_selector.clone(), + ), + &(-1i64).to_le_bytes(), + MutationType::Add, + ); + } + } Ok(exists) }) .custom_instrument(tracing::info_span!("actor_clear_pending_alloc_tx")) .await?; + state.allocated_serverless_slot = false; + Ok(cleared) } @@ -1187,6 +1203,8 @@ pub async fn check_runners( ) -> Result { ctx.udb()? .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + // Diff the list of seen runners with the list of active runners so we know which we can clean up // state let remove_runners = futures_util::stream::iter(input.seen_runners.clone()) diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 4d06a657b5..ce4fc2be6b 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -1,3 +1,5 @@ +use std::time::{Duration, Instant}; + use futures_util::{FutureExt, StreamExt, TryStreamExt}; use gas::prelude::*; use rivet_data::converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData}; @@ -14,6 +16,7 @@ use crate::{keys, metrics, workflows::actor::Allocate}; /// Batch size of how many events to ack. const EVENT_ACK_BATCH_SIZE: i64 = 500; +const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500); #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { @@ -965,6 +968,7 @@ pub(crate) async fn allocate_pending_actors( let (allocations, pending_actor_count) = ctx .udb()? .run(|tx| async move { + let start = Instant::now(); let tx = tx.with_subspace(keys::subspace()); let mut allocations = Vec::new(); @@ -987,6 +991,11 @@ pub(crate) async fn allocate_pending_actors( let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; 'queue_loop: loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out processing pending actors queue"); + break; + } + let Some(queue_entry) = queue_stream.try_next().await? else { break; }; @@ -1013,6 +1022,11 @@ pub(crate) async fn allocate_pending_actors( let mut highest_version = None; loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out processing pending actors queue"); + break 'queue_loop; + } + let Some(entry) = stream.try_next().await? else { break; }; diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index fe39291caa..174bfd66f9 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -1,3 +1,5 @@ +use std::time::{Duration, Instant}; + use futures_util::{FutureExt, StreamExt, TryStreamExt}; use gas::prelude::*; use rivet_data::converted::RunnerByKeyKeyData; @@ -12,6 +14,8 @@ use vbare::OwnedVersionedData; use crate::{keys, metrics, workflows::actor::Allocate}; +const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500); + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { pub runner_id: Id, @@ -611,6 +615,7 @@ pub(crate) async fn allocate_pending_actors( let (allocations, pending_actor_count) = ctx .udb()? .run(|tx| async move { + let start = Instant::now(); let tx = tx.with_subspace(keys::subspace()); let mut allocations = Vec::new(); @@ -633,6 +638,11 @@ pub(crate) async fn allocate_pending_actors( let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; 'queue_loop: loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out processing pending actors queue"); + break; + } + let Some(queue_entry) = queue_stream.try_next().await? else { break; }; @@ -659,6 +669,11 @@ pub(crate) async fn allocate_pending_actors( let mut highest_version = None; loop { + if start.elapsed() > EARLY_TXN_TIMEOUT { + tracing::warn!("timed out processing pending actors queue"); + break 'queue_loop; + } + let Some(entry) = stream.try_next().await? else { break; }; diff --git a/engine/packages/pegboard/src/workflows/runner_pool.rs b/engine/packages/pegboard/src/workflows/runner_pool.rs index d1bb91120c..916458a0c1 100644 --- a/engine/packages/pegboard/src/workflows/runner_pool.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool.rs @@ -38,6 +38,14 @@ pub async fn pegboard_runner_pool(ctx: &mut WorkflowCtx, input: &Input) -> Resul }) .await? else { + // Drain all + for runner in &state.runners { + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) + .send() + .await?; + } + return Ok(Loop::Break(())); }; @@ -96,12 +104,22 @@ pub async fn pegboard_runner_pool(ctx: &mut WorkflowCtx, input: &Input) -> Resul } // Wait for Bump or serverless signals until we tick again - for sig in ctx.listen_n::
(1024).await? { + for sig in ctx.listen_n::
(512).await? { match sig { Main::OutboundConnDrainStarted(sig) => { - state - .runners - .retain(|r| r.receiver_wf_id != sig.receiver_wf_id); + let (new, drain_started) = + std::mem::take(&mut state.runners) + .into_iter() + .partition::, _>(|r| r.receiver_wf_id != sig.receiver_wf_id); + state.runners = new; + + for runner in drain_started { + // TODO: Spawn sub wf to process these so this is not blocking the loop + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) + .send() + .await?; + } } Main::Bump(_) => {} } @@ -190,6 +208,12 @@ async fn read_desired(ctx: &ActivityCtx, input: &ReadDesiredInput) -> Result Result {} + Ok(OutboundReqOutput::Continue) => { + if let Err(err) = ctx + .signal(runner_pool::Bump {}) + // This is ok because bumps are not stateful + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() + .to_workflow_id(input.pool_wf_id) + .send() + .await + { + tracing::debug!(?err, "failed to send bump signal"); + + return Ok(OutboundReqOutput::Draining { drain_sent: false }); + } + } Ok(OutboundReqOutput::Draining { drain_sent }) => { return Ok(OutboundReqOutput::Draining { drain_sent }); } @@ -279,8 +292,6 @@ async fn outbound_req_inner( match event { Ok(sse::Event::Open) => {} Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, "received outbound req message"); - if runner_id.is_none() { let data = BASE64.decode(msg.data).context("invalid base64 message")?; let payload = @@ -396,8 +407,6 @@ async fn finish_non_critical_draining( match event { Ok(sse::Event::Open) => {} Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, ?runner_id, "received outbound req message"); - // If runner_id is none at this point it means we did not send the stopping signal yet, so // send it now if runner_id.is_none() { @@ -457,7 +466,7 @@ async fn drain_runner(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { }) // This is ok, because runner_id changes every retry of outbound_req .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() - .to_workflow::() + .to_workflow::() .tag("runner_id", runner_id) .graceful_not_found() .send() diff --git a/engine/packages/serverless-backfill/src/lib.rs b/engine/packages/serverless-backfill/src/lib.rs index 5213a2be43..f1149de4ae 100644 --- a/engine/packages/serverless-backfill/src/lib.rs +++ b/engine/packages/serverless-backfill/src/lib.rs @@ -8,7 +8,7 @@ use universaldb::utils::IsolationLevel::*; pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, + db::DatabaseKv::new(config.clone(), pools.clone()).await?, config.clone(), pools, cache, diff --git a/engine/packages/universaldb/src/driver/postgres/database.rs b/engine/packages/universaldb/src/driver/postgres/database.rs index 1ab832997d..6450a1e4a4 100644 --- a/engine/packages/universaldb/src/driver/postgres/database.rs +++ b/engine/packages/universaldb/src/driver/postgres/database.rs @@ -16,12 +16,12 @@ use crate::{ driver::{BoxFut, DatabaseDriver, Erased}, error::DatabaseError, options::DatabaseOption, + transaction::TXN_TIMEOUT, utils::{MaybeCommitted, calculate_tx_retry_backoff}, }; use super::transaction::PostgresTransactionDriver; -const TXN_TIMEOUT: Duration = Duration::from_secs(5); const GC_INTERVAL: Duration = Duration::from_secs(5); pub struct PostgresDatabaseDriver { diff --git a/engine/packages/universaldb/src/driver/rocksdb/database.rs b/engine/packages/universaldb/src/driver/rocksdb/database.rs index 23bac0ec5c..e2de487482 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/database.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/database.rs @@ -4,7 +4,6 @@ use std::{ Arc, atomic::{AtomicI32, Ordering}, }, - time::Duration, }; use anyhow::{Context, Result}; @@ -15,6 +14,7 @@ use crate::{ driver::{BoxFut, DatabaseDriver, Erased}, error::DatabaseError, options::DatabaseOption, + transaction::TXN_TIMEOUT, utils::{MaybeCommitted, calculate_tx_retry_backoff}, }; @@ -22,8 +22,6 @@ use super::{ transaction::RocksDbTransactionDriver, transaction_conflict_tracker::TransactionConflictTracker, }; -const TXN_TIMEOUT: Duration = Duration::from_secs(5); - pub struct RocksDbDatabaseDriver { db: Arc, max_retries: AtomicI32, diff --git a/engine/packages/universaldb/src/transaction.rs b/engine/packages/universaldb/src/transaction.rs index 96631e9514..0fe9f02060 100644 --- a/engine/packages/universaldb/src/transaction.rs +++ b/engine/packages/universaldb/src/transaction.rs @@ -1,4 +1,4 @@ -use std::{future::Future, ops::Deref, pin::Pin, sync::Arc}; +use std::{future::Future, ops::Deref, pin::Pin, sync::Arc, time::Duration}; use anyhow::{Context, Result}; @@ -15,6 +15,8 @@ use crate::{ value::{Slice, Value, Values}, }; +pub const TXN_TIMEOUT: Duration = Duration::from_secs(5); + #[derive(Clone)] pub struct Transaction { pub(crate) driver: Arc, diff --git a/engine/packages/workflow-worker/src/lib.rs b/engine/packages/workflow-worker/src/lib.rs index 59765159cc..5b5da02487 100644 --- a/engine/packages/workflow-worker/src/lib.rs +++ b/engine/packages/workflow-worker/src/lib.rs @@ -7,7 +7,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R .merge(namespace::registry()?)? .merge(epoxy::registry()?)?; - let db = db::DatabaseKv::from_pools(pools.clone()).await?; + let db = db::DatabaseKv::new(config.clone(), pools.clone()).await?; let worker = Worker::new(reg.handle(), db, config, pools); // Start worker