From dc9084a7aff24fadee584cb2650f7cf4f0102cdd Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 18 Nov 2025 15:37:28 -0800 Subject: [PATCH 1/2] fix(guard): handle actor stopped during ws/req --- engine/packages/pegboard-gateway/src/lib.rs | 38 ++++++++++++++++++- .../pegboard-gateway/src/shared_state.rs | 12 ++++++ engine/packages/pegboard-runner/src/lib.rs | 29 +++++--------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index b1e53cd11d..a2f2cd7a55 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -152,6 +152,11 @@ impl CustomServeTrait for PegboardGateway { .context("failed to read body")? .to_bytes(); + let mut stopped_sub = self + .ctx + .subscribe::(("actor_id", self.actor_id)) + .await?; + // Build subject to publish to let tunnel_subject = pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string(); @@ -212,6 +217,10 @@ impl CustomServeTrait for PegboardGateway { break; } } + _ = stopped_sub.next() => { + tracing::debug!("actor stopped while waiting for request response"); + return Err(ServiceUnavailable.build()); + } _ = drop_rx.changed() => { tracing::warn!("tunnel message timeout"); return Err(ServiceUnavailable.build()); @@ -278,6 +287,11 @@ impl CustomServeTrait for PegboardGateway { } } + let mut stopped_sub = self + .ctx + .subscribe::(("actor_id", self.actor_id)) + .await?; + // Build subject to publish to let tunnel_subject = pegboard::pubsub_subjects::RunnerReceiverSubject::new(self.runner_id).to_string(); @@ -339,6 +353,10 @@ impl CustomServeTrait for PegboardGateway { break; } } + _ = stopped_sub.next() => { + tracing::debug!("actor stopped while waiting for websocket open"); + return Err(WebSocketServiceUnavailable.build()); + } _ = drop_rx.changed() => { tracing::warn!("websocket open timeout"); return Err(WebSocketServiceUnavailable.build()); @@ -364,7 +382,7 @@ impl CustomServeTrait for PegboardGateway { open_msg.can_hibernate }; - // Send reclaimed messages + // Send pending messages self.shared_state .resend_pending_websocket_messages(request_id) .await?; @@ -415,6 +433,15 @@ impl CustomServeTrait for PegboardGateway { return Err(WebSocketServiceHibernate.build()); } } + _ = stopped_sub.next() => { + tracing::debug!("actor stopped during websocket handler loop"); + + if can_hibernate { + return Err(WebSocketServiceHibernate.build()); + } else { + return Err(WebSocketServiceUnavailable.build()); + } + } _ = drop_rx.changed() => { tracing::warn!("websocket message timeout"); return Err(WebSocketServiceTimeout.build()); @@ -579,6 +606,15 @@ impl CustomServeTrait for PegboardGateway { client_ws: WebSocketHandle, unique_request_id: Uuid, ) -> Result { + // Immediately rewake if we have pending messages + if self + .shared_state + .has_pending_websocket_messages(unique_request_id.into_bytes()) + .await? + { + return Ok(HibernationResult::Continue); + } + // Start keepalive task let ctx = self.ctx.clone(); let actor_id = self.actor_id; diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 8b6aabb8cb..527fe0d092 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -344,6 +344,18 @@ impl SharedState { Ok(()) } + pub async fn has_pending_websocket_messages(&self, request_id: RequestId) -> Result { + let Some(req) = self.in_flight_requests.get_async(&request_id).await else { + bail!("request not in flight"); + }; + + if let Some(hs) = &req.hibernation_state { + Ok(!hs.pending_ws_msgs.is_empty()) + } else { + Ok(false) + } + } + pub async fn ack_pending_websocket_messages( &self, request_id: RequestId, diff --git a/engine/packages/pegboard-runner/src/lib.rs b/engine/packages/pegboard-runner/src/lib.rs index 247916044c..05e72fe8f0 100644 --- a/engine/packages/pegboard-runner/src/lib.rs +++ b/engine/packages/pegboard-runner/src/lib.rs @@ -12,7 +12,7 @@ use rivet_guard_core::{ use rivet_runner_protocol as protocol; use std::time::Duration; use tokio::sync::watch; -use tokio_tungstenite::tungstenite::protocol::frame::{CloseFrame, coding::CloseCode}; +use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; @@ -243,30 +243,19 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { ); } - // Send WebSocket close messages to all remaining active requests + // Send close messages to all remaining active requests let active_requests = conn.tunnel_active_requests.lock().await; for (request_id, req) in &*active_requests { - let close_msg_kind = if req.is_ws { - let (close_code, close_reason) = if lifecycle_res.is_ok() { - (CloseCode::Normal.into(), None) - } else { - (CloseCode::Error.into(), Some("ws.upstream_closed".into())) - }; - - protocol::ToServerTunnelMessageKind::ToServerWebSocketClose( - protocol::ToServerWebSocketClose { - code: Some(close_code), - reason: close_reason, - hibernate: true, - }, - ) - } else { - protocol::ToServerTunnelMessageKind::ToServerResponseAbort - }; + // Websockets are not ephemeral like requests. If the runner ws closes they are not informed; + // instead they wait for the actor itself to stop. + if req.is_ws { + continue; + } + let close_message = protocol::ToServerTunnelMessage { request_id: request_id.clone(), message_id: Uuid::new_v4().into_bytes(), - message_kind: close_msg_kind, + message_kind: protocol::ToServerTunnelMessageKind::ToServerResponseAbort, }; let msg_serialized = protocol::versioned::ToGateway::wrap_latest(protocol::ToGateway { From c971e5fa18f73c51f505f738bf35c6b4b44cc369 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 18 Nov 2025 17:28:08 -0800 Subject: [PATCH 2/2] temp --- .../packages/guard-core/src/proxy_service.rs | 2 +- engine/packages/pegboard-gateway/src/lib.rs | 49 ++++++++++--------- website/public/llms-full.txt | 27 +--------- website/public/llms.txt | 2 + 4 files changed, 31 insertions(+), 49 deletions(-) diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 25831626ec..161b073ab7 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -2537,7 +2537,7 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool { } } -fn is_ws_hibernate(err: &anyhow::Error) -> bool { +pub fn is_ws_hibernate(err: &anyhow::Error) -> bool { if let Some(rivet_err) = err.chain().find_map(|x| x.downcast_ref::()) { rivet_err.group() == "guard" && rivet_err.code() == "websocket_service_hibernate" } else { diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index a2f2cd7a55..b0d24c9976 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -14,7 +14,7 @@ use rivet_guard_core::{ ServiceUnavailable, WebSocketServiceHibernate, WebSocketServiceTimeout, WebSocketServiceUnavailable, }, - proxy_service::ResponseBody, + proxy_service::{ResponseBody, is_ws_hibernate}, request_context::RequestContext, websocket_handle::WebSocketReceiver, }; @@ -559,28 +559,33 @@ impl CustomServeTrait for PegboardGateway { (res, _) => res, }; - // Send WebSocket close message to runner - let (close_code, close_reason) = match &mut lifecycle_res { - // Taking here because it won't be used again - Ok(LifecycleResult::ClientClose(Some(close))) => { - (close.code, Some(std::mem::take(&mut close.reason))) - } - Ok(_) => (CloseCode::Normal.into(), None), - Err(_) => (CloseCode::Error.into(), Some("ws.downstream_closed".into())), - }; - let close_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketClose( - protocol::ToClientWebSocketClose { - code: Some(close_code.into()), - reason: close_reason.map(|x| x.as_str().to_string()), - }, - ); - - if let Err(err) = self - .shared_state - .send_message(request_id, close_message) - .await + // Send close frame to runner if not hibernating + if lifecycle_res + .as_ref() + .map_or_else(is_ws_hibernate, |_| false) { - tracing::error!(?err, "error sending close message"); + let (close_code, close_reason) = match &mut lifecycle_res { + // Taking here because it won't be used again + Ok(LifecycleResult::ClientClose(Some(close))) => { + (close.code, Some(std::mem::take(&mut close.reason))) + } + Ok(_) => (CloseCode::Normal.into(), None), + Err(_) => (CloseCode::Error.into(), Some("ws.downstream_closed".into())), + }; + let close_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketClose( + protocol::ToClientWebSocketClose { + code: Some(close_code.into()), + reason: close_reason.map(|x| x.as_str().to_string()), + }, + ); + + if let Err(err) = self + .shared_state + .send_message(request_id, close_message) + .await + { + tracing::error!(?err, "error sending close message"); + } } // Send WebSocket close message to client diff --git a/website/public/llms-full.txt b/website/public/llms-full.txt index 326df2442c..2bad7ad3be 100644 --- a/website/public/llms-full.txt +++ b/website/public/llms-full.txt @@ -182,32 +182,7 @@ See [helper types](/docs/actors/helper-types) for more details on using `ActionC # API Reference -For comprehensive API documentation, please refer to the TypeDoc generated documentation for each package: - -## Core Packages - -- [**rivetkit**](/typedoc/rivetkit/) - Main RivetKit package with full actor framework -- [**@rivetkit/actor**](/typedoc/actor/) - Core actor primitives -- [**@rivetkit/core**](/typedoc/core/) - Core utilities and types - -## Platform Adapters - -- [**@rivetkit/cloudflare-workers**](/typedoc/cloudflare-workers/) - Cloudflare Workers adapter -- [**@rivetkit/next-js**](/typedoc/next-js/) - Next.js integration -- [**@rivetkit/react**](/typedoc/react/) - React hooks and components - -## Additional Packages - -- [**@rivetkit/db**](/typedoc/db/) - Database integration utilities - ---- - -The TypeDoc documentation provides detailed information about: - -- All exported types and interfaces -- Function signatures and parameters -- Class methods and properties -- Return types and examples +For comprehensive API documentation, please refer to the [TypeDoc generated documentation](/typedoc/). ## Authentication # Authentication diff --git a/website/public/llms.txt b/website/public/llms.txt index 213416054c..93aa0e255a 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -29,6 +29,7 @@ https://rivet.dev/blog/2025-10-20-how-we-built-websocket-servers-for-vercel-func https://rivet.dev/blog/2025-10-20-weekly-updates https://rivet.dev/blog/2025-10-24-weekly-updates https://rivet.dev/blog/2025-11-02-weekly-updates +https://rivet.dev/blog/2025-11-09-weekly-updates https://rivet.dev/blog/godot-multiplayer-compared-to-unity https://rivet.dev/changelog https://rivet.dev/changelog.json @@ -58,6 +59,7 @@ https://rivet.dev/changelog/2025-10-20-how-we-built-websocket-servers-for-vercel https://rivet.dev/changelog/2025-10-20-weekly-updates https://rivet.dev/changelog/2025-10-24-weekly-updates https://rivet.dev/changelog/2025-11-02-weekly-updates +https://rivet.dev/changelog/2025-11-09-weekly-updates https://rivet.dev/changelog/godot-multiplayer-compared-to-unity https://rivet.dev/cloud https://rivet.dev/docs/actors