diff --git a/Cargo.lock b/Cargo.lock index 7bce7fb09c..05eb7e67d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2500,17 +2500,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "internal" -version = "2.0.24-rc.1" -dependencies = [ - "anyhow", - "gasoline", - "rivet-api-util", - "rivet-types", - "serde", -] - [[package]] name = "io-uring" version = "0.7.9" @@ -2907,8 +2896,6 @@ dependencies = [ "epoxy", "epoxy-protocol", "gasoline", - "internal", - "reqwest", "rivet-api-builder", "rivet-api-types", "rivet-api-util", @@ -3428,6 +3415,8 @@ dependencies = [ "lazy_static", "namespace", "nix 0.30.1", + "reqwest", + "reqwest-eventsource", "rivet-api-types", "rivet-api-util", "rivet-data", @@ -3442,6 +3431,7 @@ dependencies = [ "tracing", "universaldb", "universalpubsub", + "url", "utoipa", "uuid", "vbare", @@ -3528,28 +3518,6 @@ dependencies = [ "vbare", ] -[[package]] -name = "pegboard-serverless" -version = "2.0.24-rc.1" -dependencies = [ - "anyhow", - "base64 0.22.1", - "epoxy", - "gasoline", - "namespace", - "pegboard", - "reqwest", - "reqwest-eventsource", - "rivet-config", - "rivet-runner-protocol", - "rivet-types", - "rivet-util", - "tracing", - "universaldb", - "universalpubsub", - "vbare", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4490,7 +4458,6 @@ dependencies = [ "namespace", "pegboard", "pegboard-runner", - "pegboard-serverless", "portpicker", "rand 0.8.5", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 0f353940a3..a676f2f9e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/internal","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pegboard-serverless","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] +members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] [workspace.package] version = "2.0.24-rc.1" @@ -335,9 +335,6 @@ path = "engine/packages/guard" [workspace.dependencies.rivet-guard-core] path = "engine/packages/guard-core" -[workspace.dependencies.internal] -path = "engine/packages/internal" - [workspace.dependencies.rivet-logs] path = "engine/packages/logs" @@ -356,9 +353,6 @@ path = "engine/packages/pegboard-gateway" [workspace.dependencies.pegboard-runner] path = "engine/packages/pegboard-runner" -[workspace.dependencies.pegboard-serverless] -path = "engine/packages/pegboard-serverless" - [workspace.dependencies.rivet-pools] path = "engine/packages/pools" diff --git a/engine/artifacts/errors/serverless_runner_pool.not_found.json b/engine/artifacts/errors/serverless_runner_pool.not_found.json new file mode 100644 index 0000000000..8695b814ec --- /dev/null +++ b/engine/artifacts/errors/serverless_runner_pool.not_found.json @@ -0,0 +1,5 @@ +{ + "code": "not_found", + "group": "serverless_runner_pool", + "message": "No serverless pool for this runner exists." +} \ No newline at end of file diff --git a/engine/packages/api-peer/Cargo.toml b/engine/packages/api-peer/Cargo.toml index 060e9d6a3a..0f4171d64e 100644 --- a/engine/packages/api-peer/Cargo.toml +++ b/engine/packages/api-peer/Cargo.toml @@ -24,11 +24,11 @@ serde.workspace = true serde_json.workspace = true indexmap.workspace = true -tokio.workspace = true -tracing.workspace = true namespace.workspace = true pegboard.workspace = true pegboard-actor-kv.workspace = true +tokio.workspace = true +tracing.workspace = true universalpubsub.workspace = true uuid.workspace = true utoipa.workspace = true diff --git a/engine/packages/api-peer/src/internal.rs b/engine/packages/api-peer/src/internal.rs index e9359c764d..33fccb13be 100644 --- a/engine/packages/api-peer/src/internal.rs +++ b/engine/packages/api-peer/src/internal.rs @@ -29,23 +29,6 @@ pub async fn cache_purge( Ok(CachePurgeResponse {}) } -#[derive(Serialize)] -#[serde(deny_unknown_fields)] -pub struct BumpServerlessAutoscalerResponse {} - -pub async fn bump_serverless_autoscaler( - ctx: ApiCtx, - _path: (), - _query: (), - _body: (), -) -> Result { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; - - Ok(BumpServerlessAutoscalerResponse {}) -} - #[derive(Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct SetTracingConfigRequest { diff --git a/engine/packages/api-peer/src/router.rs b/engine/packages/api-peer/src/router.rs index 05fdc31fa8..0ddd1a136b 100644 --- a/engine/packages/api-peer/src/router.rs +++ b/engine/packages/api-peer/src/router.rs @@ -35,10 +35,6 @@ pub async fn router( .route("/runners/names", get(runners::list_names)) // MARK: Internal .route("/cache/purge", post(internal::cache_purge)) - .route( - "/bump-serverless-autoscaler", - post(internal::bump_serverless_autoscaler), - ) .route( "/epoxy/coordinator/replica-reconfigure", post(internal::epoxy_replica_reconfigure), diff --git a/engine/packages/api-peer/src/runner_configs.rs b/engine/packages/api-peer/src/runner_configs.rs index 8324984794..045f65155c 100644 --- a/engine/packages/api-peer/src/runner_configs.rs +++ b/engine/packages/api-peer/src/runner_configs.rs @@ -17,7 +17,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result Result Result .await? .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - ctx.op(namespace::ops::runner_config::delete::Input { + ctx.op(pegboard::ops::runner_config::delete::Input { namespace_id: namespace.namespace_id, name: path.runner_name, }) diff --git a/engine/packages/api-public/src/runner_configs/refresh_metadata.rs b/engine/packages/api-public/src/runner_configs/refresh_metadata.rs index 8537e87cea..5880cf3610 100644 --- a/engine/packages/api-public/src/runner_configs/refresh_metadata.rs +++ b/engine/packages/api-public/src/runner_configs/refresh_metadata.rs @@ -88,7 +88,7 @@ async fn refresh_metadata_inner( .collect(); let runner_configs = ctx - .op(namespace::ops::runner_config::get::Input { + .op(pegboard::ops::runner_config::get::Input { runners, bypass_cache: true, }) diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 57220fe7db..308d807bb7 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -56,6 +56,25 @@ pub struct Pegboard { /// /// **Experimental** pub hibernating_request_eligible_threshold: Option, + /// Time to delay a serverless runner from attempting a new outbound connection after a connection failure. + /// + /// Unit is in milliseconds. + /// + /// **Experimental** + pub serverless_base_retry_timeout: Option, + /// How long a serverless runner goes without connection failures before it's retry count is reset to 0, + /// effectively resetting its backoff to 0. + /// + /// Unit is in milliseconds. + /// + /// **Experimental** + pub serverless_retry_reset_duration: Option, + /// Maximum exponent for the serverless backoff calculation. + /// + /// This controls the maximum backoff duration when serverlessly connecting to runners. + /// + /// **Experimental** + pub serverless_backoff_max_exponent: Option, } impl Pegboard { @@ -91,4 +110,17 @@ impl Pegboard { self.hibernating_request_eligible_threshold .unwrap_or(90_000) } + + pub fn serverless_base_retry_timeout(&self) -> usize { + self.serverless_base_retry_timeout.unwrap_or(2000) + } + + pub fn serverless_retry_reset_duration(&self) -> i64 { + self.serverless_retry_reset_duration + .unwrap_or(10 * 60 * 1000) + } + + pub fn serverless_backoff_max_exponent(&self) -> usize { + self.serverless_backoff_max_exponent.unwrap_or(8) + } } diff --git a/engine/packages/engine/Cargo.toml b/engine/packages/engine/Cargo.toml index efda95749f..8e06b715f1 100644 --- a/engine/packages/engine/Cargo.toml +++ b/engine/packages/engine/Cargo.toml @@ -21,7 +21,6 @@ include_dir.workspace = true indoc.workspace = true lz4_flex.workspace = true pegboard-runner.workspace = true -pegboard-serverless.workspace = true reqwest.workspace = true rivet-api-peer.workspace = true rivet-bootstrap.workspace = true diff --git a/engine/packages/engine/src/run_config.rs b/engine/packages/engine/src/run_config.rs index a268183537..635575993d 100644 --- a/engine/packages/engine/src/run_config.rs +++ b/engine/packages/engine/src/run_config.rs @@ -27,13 +27,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { |config, pools| Box::pin(rivet_bootstrap::start(config, pools)), false, ), - Service::new( - "pegboard_serverless", - // There should only be one of these, since it's auto-scaling requests - ServiceKind::Singleton, - |config, pools| Box::pin(pegboard_serverless::start(config, pools)), - false, - ), // Core services Service::new( "tracing_reconfigure", diff --git a/engine/packages/gasoline/src/ctx/activity.rs b/engine/packages/gasoline/src/ctx/activity.rs index ce1de28c55..f1bb667e1b 100644 --- a/engine/packages/gasoline/src/ctx/activity.rs +++ b/engine/packages/gasoline/src/ctx/activity.rs @@ -7,16 +7,18 @@ use tokio::sync::Mutex; use tracing::Instrument; use crate::{ + builder::common as builder, ctx::{ common, message::{MessageCtx, SubscriptionHandle}, }, - db::DatabaseHandle, + db::{DatabaseHandle, WorkflowData}, error::{WorkflowError, WorkflowResult}, message::Message, operation::{Operation, OperationInput}, + signal::Signal, utils::tags::AsTags, - workflow::StateGuard, + workflow::{StateGuard, Workflow}, }; pub struct ActivityCtx { @@ -77,6 +79,33 @@ impl ActivityCtx { } impl ActivityCtx { + /// Finds the first incomplete workflow with the given tags. + #[tracing::instrument(skip_all, ret(Debug), fields(workflow_name=W::NAME))] + pub async fn find_workflow(&self, tags: impl AsTags) -> Result> { + common::find_workflow::(&self.db, tags) + .in_current_span() + .await + } + + /// Finds the first incomplete workflow with the given tags. + #[tracing::instrument(skip_all)] + pub async fn get_workflows(&self, workflow_ids: Vec) -> Result> { + common::get_workflows(&self.db, workflow_ids) + .in_current_span() + .await + } + + /// Creates a signal builder. + pub fn signal(&self, body: T) -> builder::signal::SignalBuilder { + builder::signal::SignalBuilder::new( + self.db.clone(), + self.config.clone(), + self.ray_id, + body, + true, + ) + } + #[tracing::instrument(skip_all)] pub fn state(&self) -> Result> { if self.parallelized { diff --git a/engine/packages/gasoline/src/ctx/operation.rs b/engine/packages/gasoline/src/ctx/operation.rs index 167b0f9af4..7e2030205e 100644 --- a/engine/packages/gasoline/src/ctx/operation.rs +++ b/engine/packages/gasoline/src/ctx/operation.rs @@ -101,7 +101,6 @@ impl OperationCtx { /// Creates a signal builder. pub fn signal(&self, body: T) -> builder::signal::SignalBuilder { - // TODO: Add check for from_workflow so you cant dispatch a signal builder::signal::SignalBuilder::new( self.db.clone(), self.config.clone(), diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index a4e94020ca..0426c2200a 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -315,6 +315,10 @@ impl WorkflowData { .transpose() .map_err(WorkflowError::DeserializeWorkflowOutput) } + + pub fn has_output(&self) -> bool { + self.output.is_some() + } } #[derive(Debug)] diff --git a/engine/packages/internal/Cargo.toml b/engine/packages/internal/Cargo.toml deleted file mode 100644 index 8c85c065b7..0000000000 --- a/engine/packages/internal/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "internal" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -gas.workspace = true -rivet-api-util.workspace = true -rivet-types.workspace = true -serde.workspace = true diff --git a/engine/packages/internal/README.md b/engine/packages/internal/README.md deleted file mode 100644 index 0c60920680..0000000000 --- a/engine/packages/internal/README.md +++ /dev/null @@ -1 +0,0 @@ -TODO: move somewhere else diff --git a/engine/packages/internal/src/lib.rs b/engine/packages/internal/src/lib.rs deleted file mode 100644 index 01eafd2ecc..0000000000 --- a/engine/packages/internal/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod ops; diff --git a/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs b/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs deleted file mode 100644 index 402f7829ed..0000000000 --- a/engine/packages/internal/src/ops/bump_serverless_autoscaler_global.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::fmt::Debug; - -use futures_util::StreamExt; -use gas::prelude::*; -use rivet_api_util::{Method, request_remote_datacenter}; - -#[derive(Clone, Debug, Default)] -pub struct Input {} - -#[operation] -pub async fn bump_serverless_autoscaler_global(ctx: &OperationCtx, input: &Input) -> Result<()> { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await - } else { - // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - "/bump-serverless-autoscaler", - Method::POST, - Option::<&()>::None, - Option::<&()>::None, - ) - .await - .map(|_| ()) - } - } - })) - .buffer_unordered(16) - .collect::>() - .await; - - // Aggregate results - let result_count = results.len(); - let mut errors = Vec::new(); - for res in results { - if let Err(err) = res { - tracing::error!(?err, "failed to request edge dc"); - errors.push(err); - } - } - - // Error only if all requests failed - if result_count == errors.len() { - if let Some(res) = errors.into_iter().next() { - return Err(res).context("all datacenter requests failed"); - } - } - - Ok(()) -} - -// TODO: This is cloned from api-peer because of a cyclical dependency -#[derive(Deserialize)] -pub struct BumpServerlessAutoscalerResponse {} diff --git a/engine/packages/internal/src/ops/cache/mod.rs b/engine/packages/internal/src/ops/cache/mod.rs deleted file mode 100644 index 65ba4904bd..0000000000 --- a/engine/packages/internal/src/ops/cache/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod purge_global; diff --git a/engine/packages/internal/src/ops/cache/purge_global.rs b/engine/packages/internal/src/ops/cache/purge_global.rs deleted file mode 100644 index 982457d89e..0000000000 --- a/engine/packages/internal/src/ops/cache/purge_global.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::fmt::Debug; - -use futures_util::StreamExt; -use gas::prelude::*; -use rivet_api_util::{Method, request_remote_datacenter}; -use rivet_cache::RawCacheKey; -use serde::Serialize; - -#[derive(Clone, Debug, Default)] -pub struct Input { - pub base_key: String, - pub keys: Vec, -} - -#[operation] -pub async fn cache_purge_global(ctx: &OperationCtx, input: &Input) -> Result<()> { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - let input = input.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - ctx.cache() - .clone() - .request() - .purge(input.base_key, input.keys) - .await - } else { - // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - "/cache/purge", - Method::POST, - Option::<&()>::None, - Some(&CachePurgeRequest { - base_key: input.base_key, - keys: input.keys, - }), - ) - .await - .map(|_| ()) - } - } - })) - .buffer_unordered(16) - .collect::>() - .await; - - // Aggregate results - let result_count = results.len(); - let mut errors = Vec::new(); - for res in results { - if let Err(err) = res { - tracing::error!(?err, "failed to request edge dc"); - errors.push(err); - } - } - - // Error only if all requests failed - if result_count == errors.len() { - if let Some(res) = errors.into_iter().next() { - return Err(res).context("all datacenter requests failed"); - } - } - - Ok(()) -} - -// TODO: This is cloned from api-peer because of a cyclical dependency -#[derive(Serialize)] -pub struct CachePurgeRequest { - pub base_key: String, - pub keys: Vec, -} - -#[derive(Deserialize)] -pub struct CachePurgeResponse {} diff --git a/engine/packages/internal/src/ops/mod.rs b/engine/packages/internal/src/ops/mod.rs deleted file mode 100644 index ab866e20d5..0000000000 --- a/engine/packages/internal/src/ops/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bump_serverless_autoscaler_global; -pub mod cache; diff --git a/engine/packages/namespace/Cargo.toml b/engine/packages/namespace/Cargo.toml index c9d6e953e4..e9e627e46b 100644 --- a/engine/packages/namespace/Cargo.toml +++ b/engine/packages/namespace/Cargo.toml @@ -10,8 +10,6 @@ anyhow.workspace = true epoxy-protocol.workspace = true epoxy.workspace = true gas.workspace = true -internal.workspace = true -reqwest.workspace = true rivet-api-builder.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true diff --git a/engine/packages/namespace/src/keys/mod.rs b/engine/packages/namespace/src/keys/mod.rs index b04883c981..020c834f8d 100644 --- a/engine/packages/namespace/src/keys/mod.rs +++ b/engine/packages/namespace/src/keys/mod.rs @@ -2,8 +2,6 @@ use anyhow::Result; use gas::prelude::*; use universaldb::prelude::*; -pub mod runner_config; - pub fn subspace() -> universaldb::utils::Subspace { universaldb::utils::Subspace::new(&(RIVET, NAMESPACE)) } diff --git a/engine/packages/namespace/src/ops/mod.rs b/engine/packages/namespace/src/ops/mod.rs index f08fd1f5b5..74fc79b4a9 100644 --- a/engine/packages/namespace/src/ops/mod.rs +++ b/engine/packages/namespace/src/ops/mod.rs @@ -3,4 +3,3 @@ pub mod get_local; pub mod list; pub mod resolve_for_name_global; pub mod resolve_for_name_local; -pub mod runner_config; diff --git a/engine/packages/pegboard-serverless/Cargo.toml b/engine/packages/pegboard-serverless/Cargo.toml deleted file mode 100644 index 6112a75754..0000000000 --- a/engine/packages/pegboard-serverless/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "pegboard-serverless" -version.workspace = true -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -anyhow.workspace = true -base64.workspace = true -epoxy.workspace = true -gas.workspace = true -reqwest-eventsource.workspace = true -reqwest.workspace = true -rivet-config.workspace = true -rivet-runner-protocol.workspace = true -rivet-types.workspace = true -rivet-util.workspace = true -tracing.workspace = true -universaldb.workspace = true -universalpubsub.workspace = true -vbare.workspace = true - -namespace.workspace = true -pegboard.workspace = true diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs deleted file mode 100644 index 19d9c9386c..0000000000 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ /dev/null @@ -1,524 +0,0 @@ -use std::{ - collections::HashMap, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, -}; - -use anyhow::{Context, Result}; -use base64::Engine; -use base64::engine::general_purpose::STANDARD as BASE64; -use futures_util::{StreamExt, TryStreamExt}; -use gas::prelude::*; -use pegboard::keys; -use reqwest::header::{HeaderName, HeaderValue}; -use reqwest_eventsource as sse; -use rivet_runner_protocol as protocol; -use rivet_types::runner_configs::RunnerConfigKind; -use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; -use universaldb::options::StreamingMode; -use universaldb::utils::IsolationLevel::*; -use universalpubsub::PublishOpts; -use vbare::OwnedVersionedData; - -const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); -const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); -const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); -const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); -const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); - -const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); - -struct OutboundConnection { - handle: JoinHandle<()>, - shutdown_tx: oneshot::Sender<()>, - draining: Arc, -} - -#[tracing::instrument(skip_all)] -pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { - let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; - let ctx = StandaloneCtx::new( - db::DatabaseKv::from_pools(pools.clone()).await?, - config.clone(), - pools, - cache, - "pegboard-serverless", - Id::new_v1(config.dc_label()), - Id::new_v1(config.dc_label()), - )?; - - let mut sub = ctx - .subscribe::(()) - .await?; - let mut outbound_connections = HashMap::new(); - - loop { - tick(&ctx, &mut outbound_connections).await?; - - sub.next().await?; - } -} - -async fn tick( - ctx: &StandaloneCtx, - outbound_connections: &mut HashMap<(Id, String), Vec>, -) -> Result<()> { - let serverless_data = ctx - .udb()? - .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); - - let serverless_desired_subspace = keys::subspace().subspace( - &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(), - ); - - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(&serverless_desired_subspace).into() - }, - // NOTE: This is a snapshot to prevent conflict with updates to this subspace - Snapshot, - ) - .map(|res| match res { - Ok(entry) => { - let (key, desired_slots) = - tx.read_entry::(&entry)?; - - Ok((key.namespace_id, key.runner_name, desired_slots)) - } - Err(err) => Err(err.into()), - }) - .try_collect::>() - .await - }) - .custom_instrument(tracing::info_span!("tick_tx")) - .await?; - - let runner_configs = ctx - .op(namespace::ops::runner_config::get::Input { - runners: serverless_data - .iter() - .map(|(ns_id, runner_name, _)| (*ns_id, runner_name.clone())) - .collect(), - bypass_cache: true, - }) - .await?; - - // Process each runner config with error handling - for (ns_id, runner_name, desired_slots) in &serverless_data { - let runner_config = runner_configs - .iter() - .find(|rc| rc.namespace_id == *ns_id && &rc.name == runner_name); - - let Some(runner_config) = runner_config else { - tracing::debug!( - ?ns_id, - ?runner_name, - "runner config not found, likely deleted" - ); - continue; - }; - - if let Err(err) = tick_runner_config( - ctx, - *ns_id, - runner_name.clone(), - *desired_slots, - runner_config, - outbound_connections, - ) - .await - { - tracing::error!( - ?ns_id, - ?runner_name, - ?err, - "failed to process runner config, continuing with others" - ); - // Continue processing other runner configs even if this one failed - continue; - } - } - - // Remove entries that aren't returned from udb - outbound_connections.retain(|(ns_id, runner_name), _| { - serverless_data - .iter() - .any(|(ns_id2, runner_name2, _)| ns_id == ns_id2 && runner_name == runner_name2) - }); - - tracing::debug!( - connection_counts=?outbound_connections.iter().map(|(k, v)| (k, v.len())).collect::>(), - ); - - Ok(()) -} - -async fn tick_runner_config( - ctx: &StandaloneCtx, - ns_id: Id, - runner_name: String, - desired_slots: i64, - runner_config: &namespace::ops::runner_config::get::RunnerConfig, - outbound_connections: &mut HashMap<(Id, String), Vec>, -) -> Result<()> { - let namespace = ctx - .op(namespace::ops::get_global::Input { - namespace_ids: vec![ns_id.clone()], - }) - .await - .context("runner namespace not found")?; - let namespace = namespace.first().context("runner namespace not found")?; - let namespace_name = &namespace.name; - - let RunnerConfigKind::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = &runner_config.config.kind - else { - tracing::debug!("not serverless config"); - return Ok(()); - }; - - let curr = outbound_connections - .entry((ns_id, runner_name.clone())) - .or_insert_with(Vec::new); - - // Remove finished and draining connections from list - curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - - // Log warning and reset to 0 if negative - let adjusted_desired_slots = if desired_slots < 0 { - tracing::error!( - ?ns_id, - ?runner_name, - ?desired_slots, - "negative desired slots, scaling to 0" - ); - 0 - } else { - desired_slots - }; - - let desired_count = - (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) - .max(*min_runners as i64) - + *runners_margin as i64) - .min(*max_runners as i64) - .try_into()?; - - // Calculate diff - let drain_count = curr.len().saturating_sub(desired_count); - let start_count = desired_count.saturating_sub(curr.len()); - - tracing::debug!(%namespace_name, %runner_name, %desired_count, %drain_count, %start_count, "scaling"); - - if drain_count != 0 { - // TODO: Implement smart logic of draining runners with the lowest allocated actors - let draining_connections = curr.split_off(desired_count); - - for conn in draining_connections { - if conn.shutdown_tx.send(()).is_err() { - tracing::debug!( - "serverless connection shutdown channel dropped, likely already stopped" - ); - } - } - } - - let starting_connections = std::iter::repeat_with(|| { - spawn_connection( - ctx.clone(), - url.clone(), - headers.clone(), - Duration::from_secs(*request_lifespan as u64), - *slots_per_runner, - runner_name.clone(), - namespace_name.clone(), - ) - }) - .take(start_count); - curr.extend(starting_connections); - - Ok(()) -} - -fn spawn_connection( - ctx: StandaloneCtx, - url: String, - headers: HashMap, - request_lifespan: Duration, - slots_per_runner: u32, - runner_name: String, - namespace_name: String, -) -> OutboundConnection { - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let draining = Arc::new(AtomicBool::new(false)); - - let draining2 = draining.clone(); - let handle = tokio::spawn(async move { - if let Err(err) = outbound_handler( - &ctx, - url, - headers, - request_lifespan, - slots_per_runner, - runner_name, - namespace_name, - shutdown_rx, - draining2, - ) - .await - { - tracing::warn!(?err, "outbound req failed"); - - // TODO: Add backoff - tokio::time::sleep(Duration::from_secs(1)).await; - - // On error, bump the autoscaler loop again - let _ = ctx - .msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await; - } - }); - - OutboundConnection { - handle, - shutdown_tx, - draining, - } -} - -async fn outbound_handler( - ctx: &StandaloneCtx, - url: String, - headers: HashMap, - request_lifespan: Duration, - slots_per_runner: u32, - runner_name: String, - namespace_name: String, - shutdown_rx: oneshot::Receiver<()>, - draining: Arc, -) -> Result<()> { - let current_dc = ctx.config().topology().current_dc()?; - - let client = rivet_pools::reqwest::client_no_timeout().await?; - - let token = if let Some(auth) = &ctx.config().auth { - Some(( - X_RIVET_TOKEN, - HeaderValue::try_from(auth.admin_token.read())?, - )) - } else { - None - }; - - let headers = headers - .into_iter() - .flat_map(|(k, v)| { - // NOTE: This will filter out invalid headers without warning - Some(( - k.parse::().ok()?, - v.parse::().ok()?, - )) - }) - .chain([ - ( - X_RIVET_ENDPOINT, - HeaderValue::try_from(current_dc.public_url.to_string())?, - ), - ( - X_RIVET_TOTAL_SLOTS, - HeaderValue::try_from(slots_per_runner)?, - ), - (X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?), - ( - X_RIVET_NAMESPACE_NAME, - HeaderValue::try_from(namespace_name.clone())?, - ), - // Deprecated - ( - HeaderName::from_static("x-rivet-namespace-id"), - HeaderValue::try_from(namespace_name)?, - ), - ]) - .chain(token) - .collect(); - - let endpoint_url = format!("{}/start", url.trim_end_matches('/')); - tracing::debug!(%endpoint_url, "sending outbound req"); - let req = client.get(endpoint_url).headers(headers); - - let mut source = sse::EventSource::new(req).context("failed creating event source")?; - let mut runner_id = None; - - let stream_handler = async { - while let Some(event) = source.next().await { - match event { - Ok(sse::Event::Open) => {} - Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, "received outbound req message"); - - if runner_id.is_none() { - let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data) - .context("invalid payload")?; - - match payload { - protocol::ToServerlessServer::ToServerlessServerInit(init) => { - runner_id = - Some(Id::parse(&init.runner_id).context("invalid runner id")?); - } - } - } - } - Err(sse::Error::StreamEnded) => { - tracing::debug!(?runner_id, "outbound req stopped early"); - - return Ok(()); - } - Err(sse::Error::InvalidStatusCode(code, res)) => { - let body = res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - bail!( - "invalid status code ({code}):\n{}", - util::safe_slice(&body, 0, 512) - ); - } - Err(err) => return Err(err.into()), - } - } - - anyhow::Ok(()) - }; - - let sleep_until_drop = request_lifespan.saturating_sub(DRAIN_GRACE_PERIOD); - tokio::select! { - res = stream_handler => return res.map_err(Into::into), - _ = tokio::time::sleep(sleep_until_drop) => {} - _ = shutdown_rx => {} - } - - draining.store(true, Ordering::SeqCst); - - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; - - if let Some(runner_id) = runner_id { - drain_runner(ctx, runner_id).await?; - } - - // Continue waiting on req while draining - let wait_for_shutdown_fut = async move { - while let Some(event) = source.next().await { - match event { - Ok(sse::Event::Open) => {} - Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, ?runner_id, "received outbound req message"); - - // If runner_id is none at this point it means we did not send the stopping signal yet, so - // send it now - if runner_id.is_none() { - let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( - &data, - ) - .context("invalid payload")?; - - match payload { - protocol::ToServerlessServer::ToServerlessServerInit(init) => { - let runner_id_local = - Id::parse(&init.runner_id).context("invalid runner id")?; - runner_id = Some(runner_id_local); - drain_runner(ctx, runner_id_local).await?; - } - } - } - } - Err(sse::Error::StreamEnded) => break, - Err(err) => return Err(err.into()), - } - } - - Result::<()>::Ok(()) - }; - - // Wait for runner to shut down - tokio::select! { - res = wait_for_shutdown_fut => return res.map_err(Into::into), - _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { - tracing::debug!(?runner_id, "reached drain grace period before runner shut down") - } - } - - // Close connection - // - // This will force the runner to stop the request in order to avoid hitting the serverless - // timeout threshold - if let Some(runner_id) = runner_id { - publish_to_client_stop(ctx, runner_id).await?; - } - - tracing::debug!(?runner_id, "outbound req stopped"); - - Ok(()) -} - -async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { - let res = ctx - .signal(pegboard::workflows::runner::Stop { - reset_actor_rescheduling: true, - }) - .to_workflow::() - .tag("runner_id", runner_id) - .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res - .as_ref() - .err() - .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) - { - tracing::warn!( - ?runner_id, - "runner workflow not found, likely already stopped" - ); - } else { - res?; - } - - Ok(()) -} - -/// Send a stop message to the client. -/// -/// This will close the runner's WebSocket. -async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { - let receiver_subject = - pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string(); - - let message_serialized = rivet_runner_protocol::versioned::ToClient::wrap_latest( - rivet_runner_protocol::ToClient::ToClientClose, - ) - .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; - - ctx.ups()? - .publish(&receiver_subject, &message_serialized, PublishOpts::one()) - .await?; - - Ok(()) -} diff --git a/engine/packages/pegboard/Cargo.toml b/engine/packages/pegboard/Cargo.toml index 9204944e14..f04605f837 100644 --- a/engine/packages/pegboard/Cargo.toml +++ b/engine/packages/pegboard/Cargo.toml @@ -13,6 +13,8 @@ gas.workspace = true lazy_static.workspace = true namespace.workspace = true nix.workspace = true +reqwest-eventsource.workspace = true +reqwest.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-data.workspace = true @@ -27,6 +29,7 @@ strum.workspace = true tracing.workspace = true universaldb.workspace = true universalpubsub.workspace = true +url.workspace = true utoipa.workspace = true uuid.workspace = true vbare.workspace = true diff --git a/engine/packages/pegboard/src/errors.rs b/engine/packages/pegboard/src/errors.rs index 62ab7138a3..3855b7c30c 100644 --- a/engine/packages/pegboard/src/errors.rs +++ b/engine/packages/pegboard/src/errors.rs @@ -74,3 +74,20 @@ pub enum Runner { #[error("not_found", "The runner does not exist.")] NotFound, } + +#[derive(RivetError, Debug, Deserialize, Serialize)] +#[error("runner_config")] +pub enum RunnerConfig { + #[error("invalid", "Invalid runner config.", "Invalid runner config: {reason}")] + Invalid { reason: String }, + + #[error("not_found", "No config for this runner exists.")] + NotFound, +} + +#[derive(RivetError, Debug, Deserialize, Serialize)] +#[error("serverless_runner_pool")] +pub enum ServerlessRunnerPool { + #[error("not_found", "No serverless pool for this runner exists.")] + NotFound, +} diff --git a/engine/packages/pegboard/src/keys/mod.rs b/engine/packages/pegboard/src/keys/mod.rs index 232e133bd1..cd8b69c235 100644 --- a/engine/packages/pegboard/src/keys/mod.rs +++ b/engine/packages/pegboard/src/keys/mod.rs @@ -5,6 +5,7 @@ pub mod epoxy; pub mod hibernating_request; pub mod ns; pub mod runner; +pub mod runner_config; pub fn subspace() -> universaldb::utils::Subspace { rivet_types::keys::pegboard::subspace() diff --git a/engine/packages/namespace/src/keys/runner_config.rs b/engine/packages/pegboard/src/keys/runner_config.rs similarity index 100% rename from engine/packages/namespace/src/keys/runner_config.rs rename to engine/packages/pegboard/src/keys/runner_config.rs diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index a776a3d227..de4121239a 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -14,6 +14,9 @@ pub fn registry() -> WorkflowResult { let mut registry = Registry::new(); registry.register_workflow::()?; registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/engine/packages/pegboard/src/ops/mod.rs b/engine/packages/pegboard/src/ops/mod.rs index f8878061f9..f8263b10dd 100644 --- a/engine/packages/pegboard/src/ops/mod.rs +++ b/engine/packages/pegboard/src/ops/mod.rs @@ -1,2 +1,3 @@ pub mod actor; pub mod runner; +pub mod runner_config; diff --git a/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs b/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs index bb8f3a2efc..60a77057ca 100644 --- a/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs +++ b/engine/packages/pegboard/src/ops/runner/find_dc_with_runner.rs @@ -73,7 +73,7 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< // Check if a serverless runner config with a max runners > 0 exists let res = ctx - .op(namespace::ops::runner_config::get::Input { + .op(crate::ops::runner_config::get::Input { runners: vec![(input.namespace_id, input.runner_name.clone())], bypass_cache: false, }) diff --git a/engine/packages/namespace/src/ops/runner_config/delete.rs b/engine/packages/pegboard/src/ops/runner_config/delete.rs similarity index 67% rename from engine/packages/namespace/src/ops/runner_config/delete.rs rename to engine/packages/pegboard/src/ops/runner_config/delete.rs index b5f7e4597d..ae78039ffd 100644 --- a/engine/packages/namespace/src/ops/runner_config/delete.rs +++ b/engine/packages/pegboard/src/ops/runner_config/delete.rs @@ -10,17 +10,17 @@ pub struct Input { } #[operation] -pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { - let bump_autoscaler = ctx +pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { + let delete_workflow = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); // Read existing config to determine variant let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); - let bump_autoscaler = + let delete_workflow = if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { tx.delete(&runner_config_key); @@ -37,14 +37,17 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - false }; - Ok(bump_autoscaler) + Ok(delete_workflow) }) .custom_instrument(tracing::info_span!("runner_config_delete_tx")) .await?; // Bump autoscaler when a serverless config is modified - if bump_autoscaler { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + if delete_workflow { + ctx.signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) .send() .await?; } diff --git a/engine/packages/namespace/src/ops/runner_config/get.rs b/engine/packages/pegboard/src/ops/runner_config/get.rs similarity index 95% rename from engine/packages/namespace/src/ops/runner_config/get.rs rename to engine/packages/pegboard/src/ops/runner_config/get.rs index 75751c8769..469506e74e 100644 --- a/engine/packages/namespace/src/ops/runner_config/get.rs +++ b/engine/packages/pegboard/src/ops/runner_config/get.rs @@ -19,7 +19,7 @@ pub struct RunnerConfig { } #[operation] -pub async fn namespace_runner_config_get( +pub async fn pegboard_runner_config_get( ctx: &OperationCtx, input: &Input, ) -> Result> { @@ -62,7 +62,7 @@ async fn runner_config_get_inner( let tx = tx.clone(); async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let runner_config_key = keys::runner_config::DataKey::new( namespace_id, diff --git a/engine/packages/namespace/src/ops/runner_config/list.rs b/engine/packages/pegboard/src/ops/runner_config/list.rs similarity index 91% rename from engine/packages/namespace/src/ops/runner_config/list.rs rename to engine/packages/pegboard/src/ops/runner_config/list.rs index c0fcf8de95..7b6f152b1d 100644 --- a/engine/packages/namespace/src/ops/runner_config/list.rs +++ b/engine/packages/pegboard/src/ops/runner_config/list.rs @@ -17,17 +17,17 @@ pub struct Input { // TODO: Needs to return default configs if they exist (currently no way to list from epoxy) #[operation] -pub async fn namespace_runner_config_list( +pub async fn pegboard_runner_config_list( ctx: &OperationCtx, input: &Input, ) -> Result> { let runner_configs = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let (start, end) = if let Some(variant) = input.variant { - let (start, end) = keys::subspace() + let (start, end) = namespace::keys::subspace() .subspace(&keys::runner_config::ByVariantKey::subspace_with_variant( input.namespace_id, variant, @@ -46,7 +46,7 @@ pub async fn namespace_runner_config_list( (start, end) } else { - let (start, end) = keys::subspace() + let (start, end) = namespace::keys::subspace() .subspace(&keys::runner_config::DataKey::subspace(input.namespace_id)) .range(); diff --git a/engine/packages/namespace/src/ops/runner_config/mod.rs b/engine/packages/pegboard/src/ops/runner_config/mod.rs similarity index 100% rename from engine/packages/namespace/src/ops/runner_config/mod.rs rename to engine/packages/pegboard/src/ops/runner_config/mod.rs diff --git a/engine/packages/namespace/src/ops/runner_config/upsert.rs b/engine/packages/pegboard/src/ops/runner_config/upsert.rs similarity index 67% rename from engine/packages/namespace/src/ops/runner_config/upsert.rs rename to engine/packages/pegboard/src/ops/runner_config/upsert.rs index 3b745d901a..dc5235cf56 100644 --- a/engine/packages/namespace/src/ops/runner_config/upsert.rs +++ b/engine/packages/pegboard/src/ops/runner_config/upsert.rs @@ -11,18 +11,23 @@ pub struct Input { pub config: RunnerConfig, } +struct UpsertOutput { + endpoint_config_changed: bool, + serverless_runner_created: bool, +} + #[operation] -pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result { - let endpoint_config_changed = ctx +pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result { + let res = ctx .udb()? .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); + let tx = tx.with_subspace(namespace::keys::subspace()); let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); // Check if config changed (for serverless, compare URL and headers) - let endpoint_config_changed = if let Some(existing_config) = + let output = if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? { // Delete previous index @@ -45,15 +50,34 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - headers: new_headers, .. }, - ) => old_url != new_url || old_headers != new_headers, + ) => UpsertOutput { + endpoint_config_changed: old_url != new_url || old_headers != new_headers, + serverless_runner_created: false, + }, + (RunnerConfigKind::Normal { .. }, RunnerConfigKind::Serverless { .. }) => { + // Config type changed to serverless + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: true, + } + } _ => { - // Config type changed or is not serverless - true + // Not serverless + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: false, + } } } } else { // New config - true + UpsertOutput { + endpoint_config_changed: true, + serverless_runner_created: matches!( + input.config.kind, + RunnerConfigKind::Serverless { .. } + ), + } }; // Write new config @@ -131,18 +155,32 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - } } - Ok(Ok(endpoint_config_changed)) + Ok(Ok(output)) }) .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) .await? .map_err(|err| err.build())?; // Bump autoscaler - if input.config.affects_autoscaler() { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + if res.serverless_runner_created { + ctx.workflow(crate::workflows::serverless::pool::Input { + namespace_id: input.namespace_id, + runner_name: input.name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) + .unique() + .dispatch() + .await?; + } else if input.config.affects_autoscaler() { + // Maybe scale it + ctx.signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) .send() .await?; } - Ok(endpoint_config_changed) + Ok(res.endpoint_config_changed) } diff --git a/engine/packages/pegboard/src/utils.rs b/engine/packages/pegboard/src/utils.rs index 61bba809ae..d86dbaf6aa 100644 --- a/engine/packages/pegboard/src/utils.rs +++ b/engine/packages/pegboard/src/utils.rs @@ -1,4 +1,8 @@ use rivet_runner_protocol as protocol; +use rivet_types::{ + keys::namespace::runner_config::RunnerConfigVariant, + runner_configs::{RunnerConfig, RunnerConfigKind}, +}; pub fn event_actor_id(event: &protocol::Event) -> &str { match event { @@ -27,3 +31,10 @@ pub fn event_generation(event: &protocol::Event) -> u32 { }) => *generation, } } + +pub fn runner_config_variant(runner_config: &RunnerConfig) -> RunnerConfigVariant { + match runner_config.kind { + RunnerConfigKind::Normal { .. } => RunnerConfigVariant::Normal, + RunnerConfigKind::Serverless { .. } => RunnerConfigVariant::Serverless, + } +} diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index 76ece1b0f2..4477e202f6 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -32,9 +32,31 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) // If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down // if needed if res.allocated_serverless_slot { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + + let bump_res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", res.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = bump_res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%res.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + bump_res?; + } } // Clear KV @@ -59,6 +81,7 @@ struct UpdateStateAndDbInput { #[derive(Debug, Serialize, Deserialize, Hash)] struct UpdateStateAndDbOutput { allocated_serverless_slot: bool, + runner_name_selector: String, } #[activity(UpdateStateAndDb)] @@ -139,6 +162,7 @@ async fn update_state_and_db( Ok(UpdateStateAndDbOutput { allocated_serverless_slot: old_allocated_serverless_slot, + runner_name_selector: state.runner_name_selector.clone(), }) } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 4f555e3a5d..d3cd59c350 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -608,9 +608,31 @@ async fn handle_stopped( if allocate_pending_res.allocations.is_empty() { // Bump autoscaler so it can scale down if needed if deallocate_res.for_serverless { - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } } } else { // Dispatch pending allocs (if any) @@ -793,3 +815,6 @@ join_signal!(Main { GoingAway, Destroy, }); + +#[message("pegboard_bump_serverless_autoscaler")] +pub(crate) struct BumpServerlessAutoscalerStub {} diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index ce9bc1acec..66967ee90a 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -1,3 +1,4 @@ +// runner wf see how signal fail handling use base64::Engine; use base64::prelude::BASE64_STANDARD; use futures_util::StreamExt; @@ -114,7 +115,7 @@ async fn update_runner(ctx: &ActivityCtx, input: &UpdateRunnerInput) -> Result<( } #[derive(Debug, Serialize, Deserialize, Hash)] -struct AllocateActorInput { +struct AllocateActorInputV1 { actor_id: Id, generation: u32, /// When set, forces actors with CrashPolicy::Sleep to pend instead of sleep. @@ -123,7 +124,7 @@ struct AllocateActorInput { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub enum AllocateActorOutput { +pub enum AllocateActorOutputV1 { Allocated { runner_id: Id, runner_workflow_id: Id, @@ -134,12 +135,69 @@ pub enum AllocateActorOutput { Sleep, } -// If no availability, returns the timestamp of the actor's queue key #[activity(AllocateActor)] async fn allocate_actor( ctx: &ActivityCtx, - input: &AllocateActorInput, -) -> Result { + input: &AllocateActorInputV1, +) -> Result { + bail!("allocate actor v1 should never be called again") +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct AllocateActorInputV2 { + actor_id: Id, + generation: u32, + force_allocate: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +struct AllocateActorOutputV2 { + status: AllocateActorStatus, + serverless: bool, +} + +impl From for AllocateActorOutputV2 { + fn from(value: AllocateActorOutputV1) -> Self { + Self { + serverless: false, + status: match value { + AllocateActorOutputV1::Allocated { + runner_id, + runner_workflow_id, + } => AllocateActorStatus::Allocated { + runner_id, + runner_workflow_id, + }, + AllocateActorOutputV1::Pending { + pending_allocation_ts, + } => AllocateActorStatus::Pending { + pending_allocation_ts, + }, + AllocateActorOutputV1::Sleep => AllocateActorStatus::Sleep, + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum AllocateActorStatus { + Allocated { + runner_id: Id, + runner_workflow_id: Id, + }, + Pending { + pending_allocation_ts: i64, + }, + Sleep, +} + +// If no availability, returns the timestamp of the actor's queue key +#[activity(AllocateActorV2)] +async fn allocate_actor_v2( + ctx: &ActivityCtx, + input: &AllocateActorInputV2, +) -> Result { let start_instant = Instant::now(); let mut state = ctx.state::()?; @@ -149,7 +207,7 @@ async fn allocate_actor( // Check if valid serverless config exists for the current ns + runner name let runner_config_res = ctx - .op(namespace::ops::runner_config::get::Input { + .op(crate::ops::runner_config::get::Input { runners: vec![(namespace_id, runner_name_selector.clone())], bypass_cache: false, }) @@ -166,7 +224,7 @@ async fn allocate_actor( // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf - let (for_serverless, res) = ctx + let res = ctx .udb()? .run(|tx| async move { let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; @@ -175,7 +233,7 @@ async fn allocate_actor( let for_serverless = tx .with_subspace(namespace::keys::subspace()) .exists( - &namespace::keys::runner_config::ByVariantKey::new( + &keys::runner_config::ByVariantKey::new( namespace_id, RunnerConfigVariant::Serverless, runner_name_selector.clone(), @@ -318,22 +376,23 @@ async fn allocate_actor( // Set actor as not sleeping tx.delete(&keys::actor::SleepTsKey::new(input.actor_id)); - return Ok(( - for_serverless, - AllocateActorOutput::Allocated { + return Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Allocated { runner_id: old_runner_alloc_key.runner_id, runner_workflow_id: old_runner_alloc_key_data.workflow_id, }, - )); + }); } } // At this point in the txn there is no availability match (crash_policy, input.force_allocate, has_valid_serverless) { - (CrashPolicy::Sleep, false, false) => { - Ok((for_serverless, AllocateActorOutput::Sleep)) - } + (CrashPolicy::Sleep, false, false) => Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Sleep, + }), // Write the actor to the alloc queue to wait _ => { let pending_allocation_ts = util::timestamp::now(); @@ -351,12 +410,12 @@ async fn allocate_actor( input.generation, )?; - Ok(( - for_serverless, - AllocateActorOutput::Pending { + Ok(AllocateActorOutputV2 { + serverless: for_serverless, + status: AllocateActorStatus::Pending { pending_allocation_ts, }, - )) + }) } } }) @@ -368,25 +427,27 @@ async fn allocate_actor( dt, &[KeyValue::new( "did_reserve", - matches!(res, AllocateActorOutput::Allocated { .. }).to_string(), + matches!(res.status, AllocateActorStatus::Allocated { .. }).to_string(), )], ); - state.for_serverless = for_serverless; - state.allocated_serverless_slot = for_serverless; + state.for_serverless = res.serverless; + state.allocated_serverless_slot = res.serverless; - match &res { - AllocateActorOutput::Allocated { + match &res.status { + AllocateActorStatus::Allocated { runner_id, runner_workflow_id, + .. } => { state.sleep_ts = None; state.pending_allocation_ts = None; state.runner_id = Some(*runner_id); state.runner_workflow_id = Some(*runner_workflow_id); } - AllocateActorOutput::Pending { + AllocateActorStatus::Pending { pending_allocation_ts, + .. } => { tracing::warn!( actor_id=?input.actor_id, @@ -395,7 +456,7 @@ async fn allocate_actor( state.pending_allocation_ts = Some(*pending_allocation_ts); } - AllocateActorOutput::Sleep => {} + AllocateActorStatus::Sleep => {} } Ok(res) @@ -495,25 +556,62 @@ pub async fn spawn_actor( generation: u32, force_allocate: bool, ) -> Result { - // Attempt allocation - let allocate_res = ctx - .activity(AllocateActorInput { - actor_id: input.actor_id, - generation, - force_allocate, - }) - .await?; + let allocate_res: AllocateActorOutputV2 = match ctx.check_version(2).await? { + 1 => { + // Attempt allocation + ctx.activity(AllocateActorInputV1 { + actor_id: input.actor_id, + generation, + force_allocate, + }) + .await? + .into() + } + _latest => { + ctx.v(2) + .activity(AllocateActorInputV2 { + actor_id: input.actor_id, + generation, + force_allocate, + }) + .await? + } + }; - match allocate_res { - AllocateActorOutput::Allocated { + match allocate_res.status { + AllocateActorStatus::Allocated { runner_id, runner_workflow_id, } => { // Bump the autoscaler so it can scale up - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } + ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { actor_id: input.actor_id.to_string(), @@ -544,14 +642,38 @@ pub async fn spawn_actor( runner_workflow_id, }) } - AllocateActorOutput::Pending { + AllocateActorStatus::Pending { pending_allocation_ts, } => { - // Bump the autoscaler so it can scale up - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() + ctx.removed::>() .await?; + // Bump the autoscaler so it can scale up + if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::serverless::pool::Bump {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } + // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal match ctx.listen::().await? { @@ -620,7 +742,7 @@ pub async fn spawn_actor( } } } - AllocateActorOutput::Sleep => Ok(SpawnActorOutput::Sleep), + AllocateActorStatus::Sleep => Ok(SpawnActorOutput::Sleep), } } diff --git a/engine/packages/pegboard/src/workflows/mod.rs b/engine/packages/pegboard/src/workflows/mod.rs index f8878061f9..d55cc7acfb 100644 --- a/engine/packages/pegboard/src/workflows/mod.rs +++ b/engine/packages/pegboard/src/workflows/mod.rs @@ -1,2 +1,3 @@ pub mod actor; pub mod runner; +pub mod serverless; diff --git a/engine/packages/pegboard/src/workflows/serverless/connection.rs b/engine/packages/pegboard/src/workflows/serverless/connection.rs new file mode 100644 index 0000000000..6402638906 --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/connection.rs @@ -0,0 +1,505 @@ +use anyhow::Context; +use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; +use futures_util::{FutureExt, StreamExt}; +use gas::prelude::*; +use reqwest::header::{HeaderName, HeaderValue}; +use reqwest_eventsource as sse; +use rivet_runner_protocol as protocol; +use rivet_types::runner_configs::RunnerConfigKind; +use std::time::Instant; +use tokio::time::Duration; +use universalpubsub::PublishOpts; +use vbare::OwnedVersionedData; + +use super::{pool, runner}; +use crate::pubsub_subjects::RunnerReceiverSubject; + +const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); +const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); +const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); +const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); +const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); + +const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub pool_wf_id: Id, + pub runner_wf_id: Id, + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +struct RescheduleState { + last_retry_ts: i64, + retry_count: usize, +} + +#[workflow] +pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + // Run the connection activity, which will handle the full lifecycle + let send_drain_started = ctx + .loope(RescheduleState::default(), |ctx, state| { + let input = input.clone(); + + async move { + let res = ctx + .activity(OutboundReqInput { + pool_wf_id: input.pool_wf_id, + runner_wf_id: input.runner_wf_id, + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .await?; + + if let OutboundReqOutput::Done(res) = res { + return Ok(Loop::Break(res.send_drain_started)); + } + + let mut backoff = reconnect_backoff( + state.retry_count, + ctx.config().pegboard().serverless_base_retry_timeout(), + ctx.config().pegboard().serverless_backoff_max_exponent(), + ); + + let retry_res = ctx + .activity(CompareRetryInput { + retry_count: state.retry_count, + last_retry_ts: state.last_retry_ts, + }) + .await?; + + state.retry_count = if retry_res.should_reset { + 0 + } else { + state.retry_count + 1 + }; + state.last_retry_ts = retry_res.now; + + let next = backoff.step().expect("should not have max retry"); + if let Some(_sig) = ctx + .listen_with_timeout::(Instant::from(next) - Instant::now()) + .await? + { + tracing::debug!("drain received during serverless connection backoff"); + + // Notify parent that drain started + return Ok(Loop::Break(true)); + } + + Ok(Loop::Continue) + } + .boxed() + }) + .await?; + + // If we failed to send inline during the activity, durably ensure the + // signal is dispatched here + if send_drain_started { + ctx.signal(pool::RunnerDrainStarted { + runner_wf_id: input.runner_wf_id, + }) + .to_workflow_id(input.pool_wf_id) + .send() + .await?; + } + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct CompareRetryInput { + retry_count: usize, + last_retry_ts: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CompareRetryOutput { + should_reset: bool, + now: i64, +} + +#[activity(CompareRetry)] +async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result { + let now = util::timestamp::now(); + + // If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count + let should_reset = + input.last_retry_ts < now - ctx.config().pegboard().serverless_retry_reset_duration(); + + Ok(CompareRetryOutput { should_reset, now }) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct OutboundReqInput { + pool_wf_id: Id, + runner_wf_id: Id, + namespace_id: Id, + runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct OutboundReqInnerOutput { + send_drain_started: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +enum OutboundReqOutput { + Done(OutboundReqInnerOutput), + NeedsRetry, +} + +#[activity(OutboundReq)] +#[timeout = u64::MAX] +async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result { + match outbound_req_inner(ctx, input).await { + Ok(res) => Ok(OutboundReqOutput::Done(res)), + Err(error) => { + tracing::error!(?error, "outbound_req_inner failed, retrying after backoff"); + Ok(OutboundReqOutput::NeedsRetry) + } + } +} + +async fn outbound_req_inner( + ctx: &ActivityCtx, + input: &OutboundReqInput, +) -> Result { + if is_runner_draining(ctx, input.runner_wf_id).await? { + return Ok(OutboundReqInnerOutput { + send_drain_started: true, + }); + } + + let mut drain_sub = ctx + .subscribe::(("workflow_id", ctx.workflow_id())) + .await?; + + let (runner_config_res, namespace_res) = tokio::try_join!( + ctx.op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.runner_name.clone())], + bypass_cache: false, + }), + ctx.op(namespace::ops::get_global::Input { + namespace_ids: vec![input.namespace_id], + }) + )?; + let Some(runner_config) = runner_config_res.into_iter().next() else { + tracing::debug!("runner config does not exist, ending outbound req"); + return Ok(OutboundReqInnerOutput { + send_drain_started: true, + }); + }; + + let RunnerConfigKind::Serverless { + url, + headers, + slots_per_runner, + request_lifespan, + .. + } = runner_config.config.kind + else { + tracing::debug!("runner config is not serverless, ending outbound req"); + return Ok(OutboundReqInnerOutput { + send_drain_started: true, + }); + }; + + let namespace = namespace_res + .into_iter() + .next() + .context("runner namespace not found")?; + + let current_dc = ctx.config().topology().current_dc()?; + + let token = if let Some(auth) = &ctx.config().auth { + Some(( + X_RIVET_TOKEN, + HeaderValue::try_from(auth.admin_token.read())?, + )) + } else { + None + }; + + let headers = headers + .into_iter() + .flat_map(|(k, v)| { + // NOTE: This will filter out invalid headers without warning + Some(( + k.parse::().ok()?, + v.parse::().ok()?, + )) + }) + .chain([ + ( + X_RIVET_ENDPOINT, + HeaderValue::try_from(current_dc.public_url.to_string())?, + ), + ( + X_RIVET_TOTAL_SLOTS, + HeaderValue::try_from(slots_per_runner)?, + ), + ( + X_RIVET_RUNNER_NAME, + HeaderValue::try_from(input.runner_name.clone())?, + ), + ( + X_RIVET_NAMESPACE_NAME, + HeaderValue::try_from(namespace.name.clone())?, + ), + // Deprecated + ( + HeaderName::from_static("x-rivet-namespace-id"), + HeaderValue::try_from(namespace.name)?, + ), + ]) + .chain(token) + .collect(); + + let endpoint_url = format!("{}/start", url.trim_end_matches('/')); + + tracing::debug!(%endpoint_url, "sending outbound req"); + + let client = rivet_pools::reqwest::client_no_timeout().await?; + let req = client.get(endpoint_url).headers(headers); + + let mut source = sse::EventSource::new(req).context("failed creating event source")?; + let mut runner_id = None; + + let stream_handler = async { + while let Some(event) = source.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, "received outbound req message"); + + if runner_id.is_none() { + let data = BASE64.decode(msg.data).context("invalid base64 message")?; + let payload = + protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data) + .context("invalid payload")?; + + match payload { + protocol::ToServerlessServer::ToServerlessServerInit(init) => { + runner_id = + Some(Id::parse(&init.runner_id).context("invalid runner id")?); + } + } + } + } + Err(sse::Error::StreamEnded) => { + tracing::debug!("outbound req stopped early"); + + return Ok(()); + } + Err(sse::Error::InvalidStatusCode(code, res)) => { + let body = res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + bail!( + "invalid status code ({code}):\n{}", + util::safe_slice(&body, 0, 512) + ); + } + Err(err) => return Err(err.into()), + } + } + + anyhow::Ok(()) + }; + + let sleep_until_drain = + Duration::from_secs(request_lifespan as u64).saturating_sub(DRAIN_GRACE_PERIOD); + tokio::select! { + res = stream_handler => { + return match res { + Err(e) => Err(e.into()), + // TODO: + // For unexpected closes, we don’t know if the runner connected + // or not bc we can’t correlate the runner id. + // + // Lifecycle state falls apart + Ok(_) => Ok(OutboundReqInnerOutput { + send_drain_started: false + }) + }; + }, + _ = tokio::time::sleep(sleep_until_drain) => {} + _ = drain_sub.next() => {} + }; + + tracing::debug!(?runner_id, "connection reached lifespan, needs draining"); + + if let Err(e) = ctx + .signal(pool::RunnerDrainStarted { + runner_wf_id: input.runner_wf_id, + }) + // This is ok, because we only send DrainStarted once + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() + .to_workflow_id(input.pool_wf_id) + .send() + .await + { + tracing::warn!( + runner_name=%input.runner_name.clone(), + namespace_id=%input.namespace_id, + workflow_id=%ctx.workflow_id(), + "failed to send signal: {}", e + ); + + // If we failed to send, have the workflow send it durably + return Ok(OutboundReqInnerOutput { + send_drain_started: true, + }); + } + + // After we tell the pool we're draining, any remaining failures + // don't matter as the pool already stopped caring about us. + if let Err(err) = finish_non_critical_draining(ctx, source, runner_id).await { + tracing::debug!(?err, "failed non critical draining phase"); + } + + Ok(OutboundReqInnerOutput { + send_drain_started: false, + }) +} + +async fn is_runner_draining(ctx: &ActivityCtx, runner_wf_id: Id) -> Result { + let runner_wf = ctx + .get_workflows(vec![runner_wf_id]) + .await? + .into_iter() + .next() + .context("cannot find own runner wf")?; + let state = runner_wf.parse_state::()?; + + Ok(state.is_draining) +} + +async fn finish_non_critical_draining( + ctx: &ActivityCtx, + mut source: sse::EventSource, + mut runner_id: Option, +) -> Result<()> { + if let Some(runner_id) = runner_id { + drain_runner(ctx, runner_id).await?; + } + + // Continue waiting on req while draining + let wait_for_shutdown_fut = async move { + while let Some(event) = source.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, ?runner_id, "received outbound req message"); + + // If runner_id is none at this point it means we did not send the stopping signal yet, so + // send it now + if runner_id.is_none() { + let data = BASE64.decode(msg.data).context("invalid base64 message")?; + let payload = + protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( + &data, + ) + .context("invalid payload")?; + + match payload { + protocol::ToServerlessServer::ToServerlessServerInit(init) => { + let runner_id_local = + Id::parse(&init.runner_id).context("invalid runner id")?; + runner_id = Some(runner_id_local); + drain_runner(ctx, runner_id_local).await?; + } + } + } + } + Err(sse::Error::StreamEnded) => break, + Err(err) => return Err(err.into()), + } + } + + Result::<()>::Ok(()) + }; + + // Wait for runner to shut down + tokio::select! { + res = wait_for_shutdown_fut => return res.map_err(Into::into), + _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { + tracing::debug!(?runner_id, "reached drain grace period before runner shut down") + } + } + + // Close connection + // + // This will force the runner to stop the request in order to avoid hitting the serverless + // timeout threshold + if let Some(runner_id) = runner_id { + publish_to_client_stop(ctx, runner_id).await?; + } + + tracing::debug!(?runner_id, "outbound req stopped"); + + Ok(()) +} + +async fn drain_runner(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { + let res = ctx + .signal(crate::workflows::runner::Stop { + reset_actor_rescheduling: true, + }) + // This is ok, because runner_id changes every retry of outbound_req + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() + .to_workflow::() + .tag("runner_id", runner_id) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + ?runner_id, + "runner workflow not found, likely already stopped" + ); + } else { + res?; + } + + Ok(()) +} + +/// Send a stop message to the client. +/// +/// This will close the runner's WebSocket. +async fn publish_to_client_stop(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { + let receiver_subject = RunnerReceiverSubject::new(runner_id).to_string(); + + let message_serialized = rivet_runner_protocol::versioned::ToClient::wrap_latest( + rivet_runner_protocol::ToClient::ToClientClose, + ) + .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + + Ok(()) +} + +#[message("pegboard_serverless_connection_drain_msg")] +pub struct DrainMessage {} + +#[signal("pegboard_serverless_connection_drain_sig")] +pub struct DrainSignal {} + +fn reconnect_backoff( + retry_count: usize, + base_retry_timeout: usize, + max_exponent: usize, +) -> util::backoff::Backoff { + util::backoff::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count) +} diff --git a/engine/packages/pegboard/src/workflows/serverless/mod.rs b/engine/packages/pegboard/src/workflows/serverless/mod.rs new file mode 100644 index 0000000000..6e37f560c3 --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/mod.rs @@ -0,0 +1,3 @@ +pub mod connection; +pub mod pool; +pub mod runner; diff --git a/engine/packages/pegboard/src/workflows/serverless/pool.rs b/engine/packages/pegboard/src/workflows/serverless/pool.rs new file mode 100644 index 0000000000..1b3cb422ed --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/pool.rs @@ -0,0 +1,195 @@ +use futures_util::FutureExt; +use gas::{db::WorkflowData, prelude::*}; +use rivet_types::{keys, runner_configs::RunnerConfigKind}; + +use super::runner; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +struct LifecycleState { + runners: Vec, +} + +#[workflow] +pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + ctx.loope(LifecycleState::default(), |ctx, state| { + let input = input.clone(); + async move { + // 1. Remove completed connections + let completed_runners = ctx + .activity(GetCompletedInput { + runners: state.runners.clone(), + }) + .await?; + + state.runners.retain(|r| !completed_runners.contains(r)); + + // 2. Get desired count -> drain and start counts + let ReadDesiredOutput::Desired(desired_count) = ctx + .activity(ReadDesiredInput { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .await? + else { + return Ok(Loop::Break(())); + }; + + let drain_count = state.runners.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(state.runners.len()); + + // 3. Drain old runners + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_runners = state.runners.iter().take(drain_count).collect::>(); + + for wf_id in draining_runners { + ctx.signal(runner::Drain {}) + .to_workflow_id(*wf_id) + .send() + .await?; + } + } + + // 4. Dispatch new runner workflows + if start_count != 0 { + for _ in 0..start_count { + let wf_id = ctx + .workflow(runner::Input { + pool_wf_id: ctx.workflow_id(), + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name.clone()) + .dispatch() + .await?; + + state.runners.push(wf_id); + } + } + + // Wait for Bump or runner update signals until we tick again + match ctx.listen::
().await? { + Main::RunnerDrainStarted(sig) => { + state.runners.retain(|wf_id| *wf_id != sig.runner_wf_id); + } + Main::Bump(_) => {} + } + + Ok(Loop::Continue) + } + .boxed() + }) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct GetCompletedInput { + runners: Vec, +} + +#[activity(GetCompleted)] +async fn get_completed(ctx: &ActivityCtx, input: &GetCompletedInput) -> Result> { + Ok(ctx + .get_workflows(input.runners.clone()) + .await? + .into_iter() + .filter(WorkflowData::has_output) + .map(|wf| wf.workflow_id) + .collect()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct ReadDesiredInput { + namespace_id: Id, + runner_name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +enum ReadDesiredOutput { + Desired(usize), + Stop, +} + +#[activity(ReadDesired)] +async fn read_desired(ctx: &ActivityCtx, input: &ReadDesiredInput) -> Result { + let runner_config_res = ctx + .op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.runner_name.clone())], + bypass_cache: false, + }) + .await?; + let Some(runner_config) = runner_config_res.into_iter().next() else { + return Ok(ReadDesiredOutput::Stop); + }; + + let RunnerConfigKind::Serverless { + slots_per_runner, + min_runners, + max_runners, + runners_margin, + .. + } = runner_config.config.kind + else { + return Ok(ReadDesiredOutput::Stop); + }; + + let desired_slots = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::pegboard::subspace()); + + tx.read( + &keys::pegboard::ns::ServerlessDesiredSlotsKey { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }, + universaldb::utils::IsolationLevel::Serializable, + ) + .await + }) + .await?; + + let adjusted_desired_slots = if desired_slots < 0 { + tracing::error!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name, + ?desired_slots, + "negative desired slots, scaling to 0" + ); + 0 + } else { + desired_slots + }; + + // Won't overflow as these values are all in u32 range + let desired_count = (runners_margin + + (adjusted_desired_slots as u32).div_ceil(slots_per_runner)) + .max(min_runners) + .min(max_runners) + .try_into()?; + + Ok(ReadDesiredOutput::Desired(desired_count)) +} + +#[signal("pegboard_serverless_bump")] +#[derive(Debug)] +pub struct Bump {} + +#[signal("pegboard_serverless_runner_drain_started")] +pub struct RunnerDrainStarted { + pub runner_wf_id: Id, +} + +join_signal!(Main { + Bump, + RunnerDrainStarted, +}); diff --git a/engine/packages/pegboard/src/workflows/serverless/runner.rs b/engine/packages/pegboard/src/workflows/serverless/runner.rs new file mode 100644 index 0000000000..6fd82161ce --- /dev/null +++ b/engine/packages/pegboard/src/workflows/serverless/runner.rs @@ -0,0 +1,83 @@ +use gas::prelude::*; + +use super::connection; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub pool_wf_id: Id, + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct State { + pub is_draining: bool, +} + +impl State { + fn new() -> Self { + Self { is_draining: false } + } +} + +#[workflow] +pub async fn pegboard_serverless_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + ctx.activity(InitStateInput {}).await?; + + let conn_wf_id = ctx + .workflow(connection::Input { + pool_wf_id: input.pool_wf_id, + runner_wf_id: ctx.workflow_id(), + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .dispatch() + .await?; + + ctx.listen::().await?; + + ctx.activity(MarkAsDrainingInput {}).await?; + + ctx.signal(connection::DrainSignal {}) + .to_workflow_id(conn_wf_id) + .send() + .await?; + + ctx.msg(connection::DrainMessage {}) + .tag("workflow_id", conn_wf_id) + .send() + .await?; + + // Wait for connection wf to complete so this wf's state remains readable + ctx.workflow::(conn_wf_id) + .output() + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InitStateInput {} + +#[activity(InitState)] +async fn init_state(ctx: &ActivityCtx, input: &InitStateInput) -> Result<()> { + let mut state = ctx.state::>()?; + + *state = Some(State::new()); + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct MarkAsDrainingInput {} + +#[activity(MarkAsDraining)] +async fn mark_as_draining(ctx: &ActivityCtx, input: &MarkAsDrainingInput) -> Result<()> { + let mut state = ctx.state::()?; + state.is_draining = true; + + Ok(()) +} + +#[signal("pegboard_serverless_runner_drain")] +pub struct Drain {} diff --git a/engine/packages/types/src/lib.rs b/engine/packages/types/src/lib.rs index ed3c57a928..c4de39bea6 100644 --- a/engine/packages/types/src/lib.rs +++ b/engine/packages/types/src/lib.rs @@ -1,7 +1,6 @@ pub mod actors; pub mod datacenters; pub mod keys; -pub mod msgs; pub mod namespaces; pub mod runner_configs; pub mod runners; diff --git a/engine/packages/types/src/msgs/mod.rs b/engine/packages/types/src/msgs/mod.rs deleted file mode 100644 index 38311a6f72..0000000000 --- a/engine/packages/types/src/msgs/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod pegboard; diff --git a/engine/packages/types/src/msgs/pegboard.rs b/engine/packages/types/src/msgs/pegboard.rs deleted file mode 100644 index 0ea2d1154c..0000000000 --- a/engine/packages/types/src/msgs/pegboard.rs +++ /dev/null @@ -1,5 +0,0 @@ -use gas::prelude::*; - -// TODO: Add namespace + runner name to this struct so bumps can be more targeted -#[message("pegboard_bump_serverless_autoscaler")] -pub struct BumpServerlessAutoscaler {} diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index 69bc8d5e0b..ff759d7b4e 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -1,4 +1,4 @@ - +import assert from "node:assert" import * as bare from "@bare-ts/lib" const DEFAULT_CONFIG = /* @__PURE__ */ bare.Config({}) @@ -1925,9 +1925,3 @@ export function decodeToServerlessServer(bytes: Uint8Array): ToServerlessServer } return result } - - -function assert(condition: boolean, message?: string): asserts condition { - if (!condition) throw new Error(message ?? "Assertion failed") -} - diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 24f828e45a..f209943b3a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1034,6 +1034,28 @@ importers: specifier: ^3.1.1 version: 3.2.4(@types/debug@4.1.12)(@types/node@22.18.1)(@vitest/ui@3.1.1)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.0) + examples/hono-serverless: + dependencies: + '@hono/node-server': + specifier: ^1.14.0 + version: 1.19.1(hono@4.9.8) + hono: + specifier: ^4.7.0 + version: 4.9.8 + devDependencies: + '@types/node': + specifier: ^22.13.9 + version: 22.18.1 + rivetkit: + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/rivetkit + tsx: + specifier: ^3.12.7 + version: 3.14.0 + typescript: + specifier: ^5.5.2 + version: 5.9.2 + examples/kitchen-sink: dependencies: '@rivetkit/react': diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 90803b4757..97882a385f 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -664,4 +664,4 @@ } } } -} \ No newline at end of file +} diff --git a/scripts/run/engine-postgres-shell.sh b/scripts/run/docker/engine-postgres-shell.sh similarity index 92% rename from scripts/run/engine-postgres-shell.sh rename to scripts/run/docker/engine-postgres-shell.sh index 20ce44d64f..3900677bcc 100755 --- a/scripts/run/engine-postgres-shell.sh +++ b/scripts/run/docker/engine-postgres-shell.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" diff --git a/scripts/run/docker/engine-postgres.sh b/scripts/run/docker/engine-postgres.sh index 054cd02841..569e59ca79 100755 --- a/scripts/run/docker/engine-postgres.sh +++ b/scripts/run/docker/engine-postgres.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" if ! command -v nc >/dev/null 2>&1; then echo "error: required command 'nc' not found." diff --git a/scripts/run/docker/engine-rocksdb.sh b/scripts/run/docker/engine-rocksdb.sh index 42690862ba..2759cf5a69 100755 --- a/scripts/run/docker/engine-rocksdb.sh +++ b/scripts/run/docker/engine-rocksdb.sh @@ -2,7 +2,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" @@ -13,4 +13,3 @@ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \ RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \ RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \ cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log - diff --git a/scripts/run/nuke-rocksdb.sh b/scripts/run/docker/nuke-rocksdb.sh similarity index 66% rename from scripts/run/nuke-rocksdb.sh rename to scripts/run/docker/nuke-rocksdb.sh index 3423a19ab3..9a863c3c82 100755 --- a/scripts/run/nuke-rocksdb.sh +++ b/scripts/run/docker/nuke-rocksdb.sh @@ -2,8 +2,9 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" rm -rf "~/Library/Application Support/rivet-engine/" +rm -rf ls ~/.local/share/rivet-engine diff --git a/website/public/llms.txt b/website/public/llms.txt index 213416054c..5ee6326556 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -154,4 +154,4 @@ https://rivet.dev/rss/feed.xml https://rivet.dev/sales https://rivet.dev/support https://rivet.dev/talk-to-an-engineer -https://rivet.dev/terms \ No newline at end of file +https://rivet.dev/terms