Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 3 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[workspace]
resolver = "2"
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/internal","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pegboard-serverless","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]

[workspace.package]
version = "2.0.24-rc.1"
Expand Down Expand Up @@ -335,9 +335,6 @@ path = "engine/packages/guard"
[workspace.dependencies.rivet-guard-core]
path = "engine/packages/guard-core"

[workspace.dependencies.internal]
path = "engine/packages/internal"

[workspace.dependencies.rivet-logs]
path = "engine/packages/logs"

Expand All @@ -356,9 +353,6 @@ path = "engine/packages/pegboard-gateway"
[workspace.dependencies.pegboard-runner]
path = "engine/packages/pegboard-runner"

[workspace.dependencies.pegboard-serverless]
path = "engine/packages/pegboard-serverless"

[workspace.dependencies.rivet-pools]
path = "engine/packages/pools"

Expand Down
5 changes: 5 additions & 0 deletions engine/artifacts/errors/serverless_runner_pool.not_found.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions engine/packages/api-peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ serde.workspace = true
serde_json.workspace = true
indexmap.workspace = true

tokio.workspace = true
tracing.workspace = true
namespace.workspace = true
pegboard.workspace = true
pegboard-actor-kv.workspace = true
tokio.workspace = true
tracing.workspace = true
universalpubsub.workspace = true
uuid.workspace = true
utoipa.workspace = true
17 changes: 0 additions & 17 deletions engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,6 @@ pub async fn cache_purge(
Ok(CachePurgeResponse {})
}

#[derive(Serialize)]
#[serde(deny_unknown_fields)]
pub struct BumpServerlessAutoscalerResponse {}

pub async fn bump_serverless_autoscaler(
ctx: ApiCtx,
_path: (),
_query: (),
_body: (),
) -> Result<BumpServerlessAutoscalerResponse> {
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;

Ok(BumpServerlessAutoscalerResponse {})
}

#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SetTracingConfigRequest {
Expand Down
4 changes: 0 additions & 4 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ pub async fn router(
.route("/runners/names", get(runners::list_names))
// MARK: Internal
.route("/cache/purge", post(internal::cache_purge))
.route(
"/bump-serverless-autoscaler",
post(internal::bump_serverless_autoscaler),
)
.route(
"/epoxy/coordinator/replica-reconfigure",
post(internal::epoxy_replica_reconfigure),
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List

if let Some(runner_names) = query.runner_names {
let runner_configs = ctx
.op(namespace::ops::runner_config::get::Input {
.op(pegboard::ops::runner_config::get::Input {
runners: runner_names
.split(',')
.map(|name| (namespace.namespace_id, name.to_string()))
Expand Down Expand Up @@ -54,7 +54,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<List
};

let runner_configs = ctx
.op(namespace::ops::runner_config::list::Input {
.op(pegboard::ops::runner_config::list::Input {
namespace_id: namespace.namespace_id,
variant,
after_name,
Expand Down Expand Up @@ -112,7 +112,7 @@ pub async fn upsert(
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let endpoint_config_changed = ctx
.op(namespace::ops::runner_config::upsert::Input {
.op(pegboard::ops::runner_config::upsert::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
config: body.0.into(),
Expand Down Expand Up @@ -150,7 +150,7 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

ctx.op(namespace::ops::runner_config::delete::Input {
ctx.op(pegboard::ops::runner_config::delete::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn refresh_metadata_inner(
.collect();

let runner_configs = ctx
.op(namespace::ops::runner_config::get::Input {
.op(pegboard::ops::runner_config::get::Input {
runners,
bypass_cache: true,
})
Expand Down
32 changes: 32 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ pub struct Pegboard {
///
/// **Experimental**
pub hibernating_request_eligible_threshold: Option<i64>,
/// Time to delay a serverless runner from attempting a new outbound connection after a connection failure.
///
/// Unit is in milliseconds.
///
/// **Experimental**
pub serverless_base_retry_timeout: Option<usize>,
/// How long a serverless runner goes without connection failures before it's retry count is reset to 0,
/// effectively resetting its backoff to 0.
///
/// Unit is in milliseconds.
///
/// **Experimental**
pub serverless_retry_reset_duration: Option<i64>,
/// Maximum exponent for the serverless backoff calculation.
///
/// This controls the maximum backoff duration when serverlessly connecting to runners.
///
/// **Experimental**
pub serverless_backoff_max_exponent: Option<usize>,
}

impl Pegboard {
Expand Down Expand Up @@ -91,4 +110,17 @@ impl Pegboard {
self.hibernating_request_eligible_threshold
.unwrap_or(90_000)
}

pub fn serverless_base_retry_timeout(&self) -> usize {
self.serverless_base_retry_timeout.unwrap_or(2000)
}

pub fn serverless_retry_reset_duration(&self) -> i64 {
self.serverless_retry_reset_duration
.unwrap_or(10 * 60 * 1000)
}

pub fn serverless_backoff_max_exponent(&self) -> usize {
self.serverless_backoff_max_exponent.unwrap_or(8)
}
}
1 change: 0 additions & 1 deletion engine/packages/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ include_dir.workspace = true
indoc.workspace = true
lz4_flex.workspace = true
pegboard-runner.workspace = true
pegboard-serverless.workspace = true
reqwest.workspace = true
rivet-api-peer.workspace = true
rivet-bootstrap.workspace = true
Expand Down
7 changes: 0 additions & 7 deletions engine/packages/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
|config, pools| Box::pin(rivet_bootstrap::start(config, pools)),
false,
),
Service::new(
"pegboard_serverless",
// There should only be one of these, since it's auto-scaling requests
ServiceKind::Singleton,
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
false,
),
// Core services
Service::new(
"tracing_reconfigure",
Expand Down
33 changes: 31 additions & 2 deletions engine/packages/gasoline/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use tokio::sync::Mutex;
use tracing::Instrument;

use crate::{
builder::common as builder,
ctx::{
common,
message::{MessageCtx, SubscriptionHandle},
},
db::DatabaseHandle,
db::{DatabaseHandle, WorkflowData},
error::{WorkflowError, WorkflowResult},
message::Message,
operation::{Operation, OperationInput},
signal::Signal,
utils::tags::AsTags,
workflow::StateGuard,
workflow::{StateGuard, Workflow},
};

pub struct ActivityCtx {
Expand Down Expand Up @@ -77,6 +79,33 @@ impl ActivityCtx {
}

impl ActivityCtx {
/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all, ret(Debug), fields(workflow_name=W::NAME))]
pub async fn find_workflow<W: Workflow>(&self, tags: impl AsTags) -> Result<Option<Id>> {
common::find_workflow::<W>(&self.db, tags)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
common::get_workflows(&self.db, workflow_ids)
.in_current_span()
.await
}

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&self, body: T) -> builder::signal::SignalBuilder<T> {
builder::signal::SignalBuilder::new(
self.db.clone(),
self.config.clone(),
self.ray_id,
body,
true,
)
}

#[tracing::instrument(skip_all)]
pub fn state<T: Serialize + DeserializeOwned>(&self) -> Result<StateGuard<'_, T>> {
if self.parallelized {
Expand Down
1 change: 0 additions & 1 deletion engine/packages/gasoline/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl OperationCtx {

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&self, body: T) -> builder::signal::SignalBuilder<T> {
// TODO: Add check for from_workflow so you cant dispatch a signal
builder::signal::SignalBuilder::new(
self.db.clone(),
self.config.clone(),
Expand Down
4 changes: 4 additions & 0 deletions engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ impl WorkflowData {
.transpose()
.map_err(WorkflowError::DeserializeWorkflowOutput)
}

pub fn has_output(&self) -> bool {
self.output.is_some()
}
}

#[derive(Debug)]
Expand Down
13 changes: 0 additions & 13 deletions engine/packages/internal/Cargo.toml

This file was deleted.

1 change: 0 additions & 1 deletion engine/packages/internal/README.md

This file was deleted.

1 change: 0 additions & 1 deletion engine/packages/internal/src/lib.rs

This file was deleted.

Loading
Loading