From c20b8cd15c0ca561b9ef8ce4381291b87c8b1fb6 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 20 Nov 2025 11:37:46 -0800 Subject: [PATCH] fix(pb): stop actors from destroying on goingaway --- .../packages/guard-core/src/custom_serve.rs | 2 +- .../packages/guard-core/src/proxy_service.rs | 29 ++++++++++--------- .../pegboard-gateway/src/shared_state.rs | 4 +-- .../pegboard/src/workflows/actor/mod.rs | 24 +++++++-------- .../packages/pegboard/src/workflows/runner.rs | 2 +- 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/engine/packages/guard-core/src/custom_serve.rs b/engine/packages/guard-core/src/custom_serve.rs index 1dd684c723..5bc1dcbcf1 100644 --- a/engine/packages/guard-core/src/custom_serve.rs +++ b/engine/packages/guard-core/src/custom_serve.rs @@ -3,8 +3,8 @@ use async_trait::async_trait; use bytes::Bytes; use http_body_util::Full; use hyper::{Request, Response}; -use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use pegboard::tunnel::id::RequestId; +use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use crate::WebSocketHandle; use crate::proxy_service::ResponseBody; diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 8caa024a2e..9c0f880620 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -612,19 +612,20 @@ impl ProxyState { let cache_key = (actor_id, ip_addr); // Get existing counter or create a new one - let counter_arc = - if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await { - existing_counter - } else { - let new_counter = Arc::new(Mutex::new(InFlightCounter::new( - middleware_config.max_in_flight.amount, - ))); - self.in_flight_counters - .insert(cache_key, new_counter.clone()) - .await; - metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]); - new_counter - }; + let counter_arc = if let Some(existing_counter) = + self.in_flight_counters.get(&cache_key).await + { + existing_counter + } else { + let new_counter = Arc::new(Mutex::new(InFlightCounter::new( + middleware_config.max_in_flight.amount, + ))); + self.in_flight_counters + .insert(cache_key, new_counter.clone()) + .await; + metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]); + new_counter + }; // Try to acquire from the counter let acquired = { @@ -638,7 +639,7 @@ impl ProxyState { } // Generate unique request ID - let request_id = Some(self.generate_unique_request_id().await?); + let request_id = Some(self.generate_unique_request_id().await?); Ok(request_id) } diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 94baa44929..32b0dfe4a5 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -194,8 +194,8 @@ impl SharedState { hs.pending_ws_msgs.push(pending_ws_msg); tracing::debug!( - index=current_message_index, - new_count=hs.pending_ws_msgs.len(), + index = current_message_index, + new_count = hs.pending_ws_msgs.len(), "pushed pending websocket message" ); } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 4f555e3a5d..fea15c712f 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -569,10 +569,10 @@ async fn handle_stopped( tracing::debug!(?variant, "actor stopped"); let force_reschedule = match &variant { - // Reset retry count on successful exit StoppedVariant::Normal { code: protocol::StopCode::Ok, } => { + // Reset retry count on successful exit state.reschedule_state = Default::default(); false @@ -583,7 +583,6 @@ async fn handle_stopped( // Clear stop gc timeout to prevent being marked as lost in the lifecycle loop state.gc_timeout_ts = None; - state.going_away = false; state.stopping = false; state.runner_id = None; let old_runner_workflow_id = state.runner_workflow_id.take(); @@ -658,16 +657,16 @@ async fn handle_stopped( } // Handle rescheduling if not marked as sleeping else if !state.sleeping { - // Anything besides a StopCode::Ok is considered a failure - let failed = !matches!( - variant, - StoppedVariant::Normal { - code: protocol::StopCode::Ok - } - ); + let graceful_exit = !state.going_away + && matches!( + variant, + StoppedVariant::Normal { + code: protocol::StopCode::Ok + } + ); - match (input.crash_policy, failed) { - (CrashPolicy::Restart, true) => { + match (input.crash_policy, graceful_exit) { + (CrashPolicy::Restart, false) => { match runtime::reschedule_actor(ctx, &input, state, false).await? { runtime::SpawnActorOutput::Allocated { .. } => {} // NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash @@ -678,7 +677,7 @@ async fn handle_stopped( } } } - (CrashPolicy::Sleep, true) => { + (CrashPolicy::Sleep, false) => { tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to crash"); state.sleeping = true; @@ -711,6 +710,7 @@ async fn handle_stopped( state.wake_for_alarm = false; state.will_wake = false; + state.going_away = false; ctx.msg(Stopped {}) .tag("actor_id", input.actor_id) diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index b87ef3acc5..7ca8c14578 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -2,7 +2,7 @@ use futures_util::{FutureExt, StreamExt, TryStreamExt}; use gas::prelude::*; use rivet_data::converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData}; use rivet_metrics::KeyValue; -use rivet_runner_protocol::{self as protocol, versioned, PROTOCOL_VERSION}; +use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned}; use universaldb::{ options::{ConflictRangeType, StreamingMode}, utils::{FormalChunkedKey, IsolationLevel::*},